> ## Documentation Index
> Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Run the KafkaConnect Connector Externally

> Set up Kafka Connect YAML configuration to stream metadata and schema updates from real-time Kafka topics into your catalog.

export const CodePanel = ({children, fileName = 'config.yaml', showLineNumbers = false}) => {
  const codePanelRef = useRef(null);
  const codeContentRef = useRef(null);
  const isProgrammaticScroll = useRef(false);
  const hoverTimeout = useRef(null);
  useEffect(() => {
    let tries = 0;
    const wrapLines = () => {
      const root = codeContentRef.current;
      if (!root) return;
      const pres = Array.from(root.querySelectorAll('pre'));
      if (!pres.length) {
        if (tries++ < 20) requestAnimationFrame(wrapLines);
        return;
      }
      let globalLine = 1;
      pres.forEach(pre => {
        const code = pre.querySelector('code') || pre;
        if (!code || code.dataset.wrapped === 'true') return;
        const raw = code.textContent || '';
        let lines = raw.split('\n');
        while (lines[0] === '') lines.shift();
        while (lines[lines.length - 1] === '') lines.pop();
        code.innerHTML = lines.map(line => {
          const ln = globalLine++;
          const num = showLineNumbers ? `<span class="line-number">${ln}</span>` : '';
          const safe = line.replace(/</g, '&lt;').replace(/>/g, '&gt;') || ' ';
          return `<span class="code-line" data-line="${ln}">${num}${safe}</span>`;
        }).join('');
        code.dataset.wrapped = 'true';
      });
    };
    wrapLines();
  }, [children, showLineNumbers]);
  useEffect(() => {
    const panel = codePanelRef.current;
    const content = codeContentRef.current;
    if (!panel || !content) return;
    const waitForLines = () => {
      const codeLines = content.querySelectorAll('.code-line');
      if (!codeLines.length) {
        requestAnimationFrame(waitForLines);
        return;
      }
      setupHighlighting(codeLines);
    };
    const setupHighlighting = codeLines => {
      const layout = panel.closest('.split-layout');
      const sections = layout.querySelectorAll('.content-section');
      const parseLines = str => {
        if (!str) return [];
        const out = [];
        str.split(',').forEach(p => {
          if (p.includes('-')) {
            const [s, e] = p.split('-').map(Number);
            for (let i = s; i <= e; i++) out.push(i);
          } else {
            const n = Number(p);
            if (!isNaN(n)) out.push(n);
          }
        });
        return out;
      };
      const clearHighlight = () => {
        codeLines.forEach(l => l.classList.remove('highlighted'));
      };
      const highlight = lines => {
        clearHighlight();
        lines.forEach(n => {
          const el = content.querySelector(`.code-line[data-line="${n}"]`);
          if (el) el.classList.add('highlighted');
        });
      };
      const scrollToLines = lines => {
        if (!lines.length) return;
        const first = lines[0];
        const targetLine = lines.length > 1 ? first : lines[0];
        const el = content.querySelector(`.code-line[data-line="${targetLine}"]`);
        if (!el) return;
        isProgrammaticScroll.current = true;
        const containerRect = content.getBoundingClientRect();
        const elRect = el.getBoundingClientRect();
        const offset = elRect.top - containerRect.top + content.scrollTop;
        const TOP_PADDING = 16;
        content.scrollTo({
          top: Math.max(offset - TOP_PADDING, 0),
          behavior: 'smooth'
        });
        setTimeout(() => {
          isProgrammaticScroll.current = false;
        }, 200);
      };
      const activate = (section, scroll) => {
        if (section.classList.contains('active')) return;
        sections.forEach(s => s.classList.remove('active'));
        section.classList.add('active');
        const lines = parseLines(section.dataset.lines);
        highlight(lines);
        if (scroll) scrollToLines(lines);
      };
      const observer = new IntersectionObserver(entries => {
        if (isProgrammaticScroll.current) return;
        entries.forEach(e => {
          if (e.isIntersecting) activate(e.target, false);
        });
      }, {
        threshold: 0.3,
        rootMargin: '-80px 0px -40% 0px'
      });
      sections.forEach(section => {
        observer.observe(section);
        section.addEventListener('click', () => activate(section, true));
        section.addEventListener('mouseenter', () => {
          clearTimeout(hoverTimeout.current);
          hoverTimeout.current = setTimeout(() => activate(section, true), 80);
        });
      });
      if (sections[0]) activate(sections[0], false);
    };
    waitForLines();
  }, []);
  const handleCopy = e => {
    const btn = e.currentTarget;
    const codeLines = codeContentRef.current?.querySelectorAll('.code-line');
    if (!codeLines || codeLines.length === 0) return;
    const text = Array.from(codeLines).map(line => {
      const clone = line.cloneNode(true);
      const lineNumber = clone.querySelector('.line-number');
      if (lineNumber) lineNumber.remove();
      return clone.textContent;
    }).join('\n');
    if (!text) return;
    navigator.clipboard.writeText(text).then(() => {
      btn.dataset.copied = 'true';
      setTimeout(() => btn.dataset.copied = 'false', 1500);
    });
  };
  return <div className="code-panel" ref={codePanelRef}>
      <div className="code-header">
        {fileName}
        <button className="copy-btn" aria-label="Copy full code" data-copied="false" onClick={handleCopy}>
          <svg className="icon-copy" viewBox="0 0 15 16" fill="currentColor">
            <path d="M10.113 3.124H2.205C1.463 3.124.86 3.655.86 4.31v10.005c0 .654.603 1.186 1.345 1.186h7.908c.742 0 1.345-.532 1.345-1.186V4.31c0-.655-.606-1.186-1.345-1.186Z" />
            <path d="M13.138.5H5.229c-.742 0-1.344.531-1.344 1.186 0 .23.209.414.47.414s.47-.184.47-.414c0-.197.182-.357.404-.357h7.909c.223 0 .404.16.404.357V11.69c0 .196-.181.356-.404.356-.262 0-.47.184-.47.415 0 .23.208.415.47.415.742 0 1.344-.532 1.344-1.186V1.686C14.482 1.03 13.88.5 13.138.5Z" />
          </svg>

          <svg className="icon-check" viewBox="0 0 20 20" fill="currentColor">
            <path fillRule="evenodd" d="M16.707 5.293a1 1 0 010 1.414l-7.25 7.25a1 1 0 01-1.414 0l-3.25-3.25a1 1 0 011.414-1.414l2.543 2.543 6.543-6.543a1 1 0 011.414 0z" clipRule="evenodd" />
          </svg>
        </button>
      </div>

      <div className="code-content" ref={codeContentRef}>
        {children}
      </div>
    </div>;
};

export const ContentSection = ({id, title, lines, children}) => <div className="content-section" data-content-id={id} data-lines={lines}>
    {title && <h4>{title}</h4>}
    {children}
  </div>;

export const ContentPanel = ({children}) => <div className="content-panel">{children}</div>;

export const CodePreview = ({children}) => {
  const [instanceId] = useState(() => `preview-${Math.random().toString(36).slice(2)}`);
  useEffect(() => {
    const nav = document.querySelector('nav') || document.querySelector('header') || document.querySelector('[class*="nav"]');
    if (nav) {
      document.documentElement.style.setProperty('--navbar-height', `${nav.offsetHeight}px`);
    }
  }, []);
  return <div className="split-layout" data-preview-id={instanceId}>
      {children}
    </div>;
};

export const ConnectorDetailsHeader = ({name, icon, stage, availableFeatures, unavailableFeatures = [], availableFeaturesCollate = []}) => {
  const showSubHeading = availableFeatures?.length > 0 || unavailableFeatures?.length > 0 || availableFeaturesCollate?.length > 0;
  const totalAvailableFeatures = [...availableFeatures || [], ...availableFeaturesCollate || []];
  return <div className="container">
      <div className="Heading">
        <div className="flex items-center gap-3">
          {icon && <div className="IconContainer">
              <img src={icon} alt={name} noZoom className="ConnectorIcon" />
            </div>}
          <h1 className="ConnectorName">{name}</h1>
          <span className={`StageBadge ${stage === 'PROD' ? 'prod' : 'beta'}`}>
            {stage}
          </span>
        </div>
      </div>
      {showSubHeading && <div className="SubHeading">
          <div className="FeaturesHeading">Feature List</div>
          <div className="FeaturesList">
            {totalAvailableFeatures.map(feature => <div className="FeatureTag AvailableFeature" key={feature}>
                ✓ {feature}
              </div>)}
            {unavailableFeatures.map(feature => <div className="FeatureTag UnavailableFeature" key={feature}>
                ✕ {feature}
              </div>)}
          </div>
        </div>}
    </div>;
};

<ConnectorDetailsHeader icon="/public/images/connectors/kafka.webp" name="KafkaConnect" stage="PROD" availableFeatures={["Pipelines", "Pipeline Status", "Tags", "Usage"]} unavailableFeatures={["Owners", "Lineage"]} />

In this section, we provide guides and references to use the KafkaConnect connector.
Configure and schedule KafkaConnect metadata and profiler workflows from the OpenMetadata UI:

* [Requirements](#requirements)
* [Metadata Ingestion](#metadata-ingestion)

## How to Run the Connector Externally

To run the Ingestion via the UI you'll need to use the OpenMetadata Ingestion Container, which comes shipped with
custom Airflow plugins to handle the workflow deployment.

If, instead, you want to manage your workflows externally on your preferred orchestrator, you can check
the following docs to run the Ingestion Framework **anywhere**.

<Columns cols={2}>
  <Card title="External Schedulers" href="/v1.12.x/deployment/ingestion">
    Get more information about running the Ingestion Framework Externally
  </Card>
</Columns>

## Requirements

### Python Requirements

<Tip>
  We have support for Python versions **3.9-3.11**
</Tip>

To run the KafkaConnect ingestion, you will need to install:

```bash theme={null}
pip3 install "openmetadata-ingestion[kafkaconnect]"
```

## Metadata Ingestion

All connectors are defined as JSON Schemas.
[Here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/kafkaConnectConnection.json)
you can find the structure to create a connection to KafkaConnect.
In order to create and run a Metadata Ingestion workflow, we will follow
the steps to create a YAML configuration able to connect to the source,
process the Entities if needed, and reach the OpenMetadata server.
The workflow is modeled around the following
[JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json)

### 1. Define the YAML Config

This is a sample config for KafkaConnect:

<CodePreview>
  <ContentPanel>
    <ContentSection id={1} title="hostPort" lines="7">
      **hostPort**: The hostname or IP address of the Kafka Connect worker with the REST API enabled
    </ContentSection>

    <ContentSection id={2} title="verifySSL" lines="8">
      **verifySSL**: Whether SSL verification should be perform when authenticating.
    </ContentSection>

    <ContentSection id={3} title="Kafka Connect Config" lines="8">
      **Kafka Connect Config**: OpenMetadata supports username/password or no Authentication.
      *Basic Authentication*

      * Username: Username to connect to Kafka Connect. This user should be able to send request to the Kafka Connect API and access the [Rest API](https://docs.confluent.io/platform/current/connect/references/restapi.html) GET endpoints.
      * Password: Password to connect to Kafka Connect.
    </ContentSection>

    <ContentSection id={4} title="messagingServiceName" lines="12">
      **messagingServiceName**: Name of the Kafka Messaging Service associated with this KafkaConnect Pipeline Service. e.g. local\_kafka.
    </ContentSection>

    <ContentSection id={2} title="SourceConfig" lines="13-32">
      #### Source Configuration - Source Config

      The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/pipelineServiceMetadataPipeline.json):

      <div>
        **dbServiceNames**: Database Service Name for the creation of lineage, if the source supports it.
      </div>

      <div>
        **includeTags**: Set the 'Include Tags' toggle to control whether to include tags as part of metadata ingestion.
      </div>

      <div>
        **includeUnDeployedPipelines**: Set the 'Include UnDeployed Pipelines' toggle to control whether to include un-deployed pipelines as part of metadata ingestion. By default it is set to `true`
      </div>

      <div>
        **markDeletedPipelines**: Set the Mark Deleted Pipelines toggle to flag pipelines as soft-deleted if they are not present anymore in the source system.
      </div>

      <div>
        **pipelineFilterPattern** and **chartFilterPattern**: Note that the `pipelineFilterPattern` and `chartFilterPattern` both support regex as include or exclude.
      </div>

      <div>
        **includeOwners**: Set the 'Include Owners' toggle to control whether to include owners to the ingested entity if the owner email matches with a user stored in the OM server as part of metadata ingestion. If the ingested entity already exists and has an owner, the owner will not be overwritten.It supports boolean values either `true` or `false`.
      </div>

      <div>
        **overrideLineage**: Set the 'Override Lineage' toggle to control whether to override the existing lineage. It supports boolean values either `true` or `false`.
      </div>

      <div>
        **overrideMetadata**: Set the 'Override Metadata' toggle to control whether to override the existing metadata in the OpenMetadata server with the metadata fetched from the source. If the toggle is set to true, the metadata fetched from the source will override the existing metadata in the OpenMetadata server. If the toggle is set to false, the metadata fetched from the source will not override the existing metadata in the OpenMetadata server. This is applicable for fields like description, tags, owner and displayName. It supports boolean values either `true` or `false`.
      </div>
    </ContentSection>

    To send the metadata to OpenMetadata, it needs to be specified as `type: metadata-rest`.

    <div>
      The main property here is the `openMetadataServerConfig`, where you can define the host and security provider of your OpenMetadata installation.
    </div>

    <div>
      **Logger Level**

      You can specify the `loggerLevel` depending on your needs. If you are trying to troubleshoot an ingestion, running with `DEBUG` will give you far more traces for identifying issues.
    </div>

    <div>
      **JWT Token**

      JWT tokens will allow your clients to authenticate against the OpenMetadata server. To enable JWT Tokens, you will get more details [here](/deployment/security/enable-jwt-tokens).

      You can refer to the JWT Troubleshooting section [link](/deployment/security/jwt-troubleshooting) for any issues in your JWT configuration.
    </div>

    <div>
      **Store Service Connection**

      If set to `true` (default), we will store the sensitive information either encrypted via the Fernet Key in the database or externally, if you have configured any [Secrets Manager](/deployment/secrets-manager).

      If set to `false`, the service will be created, but the service connection information will only be used by the Ingestion Framework at runtime, and won't be sent to the OpenMetadata server.
    </div>

    <div>
      **SSL Configuration**

      If you have added SSL to the [OpenMetadata server](/deployment/security/enable-ssl), then you will need to handle the certificates when running the ingestion too. You can either set `verifySSL` to `ignore`, or have it as `validate`, which will require you to set the `sslConfig.caCertificate` with a local path where your ingestion runs that points to the server certificate file.

      Find more information on how to troubleshoot SSL issues [here](/deployment/security/enable-ssl/ssl-troubleshooting).
    </div>

    <div>
      **ingestionPipelineFQN**

      Fully qualified name of ingestion pipeline, used to identify the current ingestion pipeline.
    </div>
  </ContentPanel>

  <CodePanel fileName="connector_config.yaml">
    ```yaml theme={null}
    source:
      type: kafkaconnect
      serviceName: kafka_connect_source
      serviceConnection:
        config:
          type: KafkaConnect
          hostPort: "https://<yourkafkaconnectresturihere>" # or http://localhost:8083 or http://127.0.0.1:8083
          verifySSL: true
          authType:
            username: username
            password: password
          # messagingServiceName: ""
    ```

    ```yaml theme={null}
      sourceConfig:
        config:
          type: PipelineMetadata
          # lineageInformation:
          #   dbServiceNames: []
          #   storageServiceNames: []
          # markDeletedPipelines: True
          # includeTags: True
          # includeLineage: true
          # includeUnDeployedPipelines: true
          # pipelineFilterPattern:
          #   includes:
          #     - pipeline1
          #     - pipeline2
          #   excludes:
          #     - pipeline3
          #     - pipeline4
          # includeOwners: true # false
          # overrideLineage: false # true
          # overrideMetadata: false # true
    ```

    ```yaml theme={null}
    sink:
      type: metadata-rest
      config: {}
    ```

    ```yaml theme={null}
    workflowConfig:
      loggerLevel: INFO  # DEBUG, INFO, WARNING or ERROR
      openMetadataServerConfig:
        hostPort: "http://localhost:8585/api"
        authProvider: openmetadata
        securityConfig:
          jwtToken: "{bot_jwt_token}"
        ## Store the service Connection information
        storeServiceConnection: true  # false
        ## Secrets Manager Configuration
        # secretsManagerProvider: aws, azure or noop
        # secretsManagerLoader: airflow or env
        ## If SSL, fill the following
        # verifySSL: validate  # or ignore
        # sslConfig:
        #   caCertificate: /local/path/to/certificate
    # ingestionPipelineFQN: <service name>.<ingestion name> ## e.g., "my_redshift.metadata"
    ```
  </CodePanel>
</CodePreview>

### 2. Run with the CLI

First, we will need to save the YAML file. Afterward, and with all requirements installed, we can run:

```bash theme={null}
metadata ingest -c <path-to-yaml>
```

Note that from connector to connector, this recipe will always be the same. By updating the YAML configuration,
you will be able to extract metadata from different sources.

## Debezium CDC Support

The KafkaConnect connector provides **full support for Debezium CDC connectors** with intelligent column extraction and accurate lineage tracking.

### What We Provide

When you ingest Debezium connectors, OpenMetadata automatically:

1. **Detects CDC Envelope Structures** - Identifies Debezium's CDC format with `op`, `before`, and `after` fields
2. **Extracts Real Table Columns** - Parses actual database columns from the CDC payload instead of CDC envelope metadata
3. **Creates Accurate Column-Level Lineage** - Maps lineage from source database tables → Kafka topics → target systems

### Recognized Configuration Parameters

OpenMetadata recognizes the following Debezium configuration parameters for intelligent CDC detection:

* `database.server.name` - Server identifier (Debezium V1)
* `topic.prefix` - Topic prefix (Debezium V2)
* `table.include.list` - Tables to capture (e.g., `mydb.customers,mydb.orders`)
