> ## Documentation Index
> Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
> Use this file to discover all available pages before exploring further.

# KafkaConnect | OpenMetadata Messaging Pipeline Connector

> Configure Kafka Connect for metadata ingestion from real-time event streams, schema updates, and topic usage.

export const MetadataIngestionUi = ({connector, selectServicePath, addNewServicePath, serviceConnectionPath}) => {
  return <>
    <p>
      To ingest metadata from your sources, you need to create a service connection.
      The service connects your source system with OpenMetadata. Once you create
      a service, you can use it to configure your ingestion workflows.<br />
      <br />
      To create a service connection and ingest your metadata, follow the steps below:
    </p>
      <Steps>
      <Step title="Select the Service">
        <ol>
          <li>
            On the left navigation bar, click <strong>Settings</strong>.
          </li>
          <li>
            On the next page, click <strong>Services</strong>, and then select the service.
            <img src="/public/images/connectors/visit-services-page.png" alt="Visit Services Page" />
          </li>
        </ol>
      </Step>

      <Step title="Create a New Service">
        To add a new service connection, click <strong>Add New Service</strong>.
        <img src="/public/images/connectors/create-new-service.png" alt="Create a new Service" />
      </Step>

      <Step title="Select the Connector">
        Select <strong>{connector}</strong> as the service type and click <strong>Next</strong>.

        {selectServicePath && <img src={selectServicePath} alt="Select Service" />}
      </Step>

      <Step title="Name and Describe the Service">
        Enter a unique <strong>Service Name</strong> and <strong>Description</strong>.
        <ul>
         <li><strong>Service Name</strong>: OpenMetadata identifies services by their service name. Enter a name that distinguishes this deployment from other services, including other {connector} services you are ingesting metadata from.</li>
        </ul>

        <Note>
          The service name cannot be changed after it is set.
       </Note>

        {addNewServicePath && <img src={addNewServicePath} alt="Add New Service" />}
      </Step>

      <Step title="Configure the Service Connection">
        Set up the connection settings required for {connector} to set up the service and start ingesting metadata from your sources. The right-hand panel displays help documentation for the selected connection type in the product UI.
        {serviceConnectionPath && <img src={serviceConnectionPath} alt="Configure Service connection" />}
      </Step>
    </Steps>
  </>;
};

export const ConnectorDetailsHeader = ({name, icon, stage, availableFeatures, unavailableFeatures = [], availableFeaturesCollate = []}) => {
  const showSubHeading = availableFeatures?.length > 0 || unavailableFeatures?.length > 0 || availableFeaturesCollate?.length > 0;
  const totalAvailableFeatures = [...availableFeatures || [], ...availableFeaturesCollate || []];
  return <div className="container">
      <div className="Heading">
        <div className="flex items-center gap-3">
          {icon && <div className="IconContainer">
              <img src={icon} alt={name} noZoom className="ConnectorIcon" />
            </div>}
          <h1 className="ConnectorName">{name}</h1>
          <span className={`StageBadge ${stage === 'PROD' ? 'prod' : 'beta'}`}>
            {stage}
          </span>
        </div>
      </div>
      {showSubHeading && <div className="SubHeading">
          <div className="FeaturesHeading">Feature List</div>
          <div className="FeaturesList">
            {totalAvailableFeatures.map(feature => <div className="FeatureTag AvailableFeature" key={feature}>
                ✓ {feature}
              </div>)}
            {unavailableFeatures.map(feature => <div className="FeatureTag UnavailableFeature" key={feature}>
                ✕ {feature}
              </div>)}
          </div>
        </div>}
    </div>;
};

<ConnectorDetailsHeader icon="/public/images/connectors/kafka.webp" name="KafkaConnect" stage="PROD" availableFeatures={["Pipelines", "Pipeline Status", "Lineage", "Usage"]} unavailableFeatures={["Owners", "Tags"]} />

In this section, we provide guides and references to use the KafkaConnect connector.
Configure and schedule KafkaConnect metadata and profiler workflows from the OpenMetadata UI:

* [Requirements](#requirements)
  * [KafkaConnect Versions](#kafkaconnect-versions)
* [Metadata Ingestion](#metadata-ingestion)
  * [Service Name](#service-name)
  * [Connection Details](#connection-details)
  * [Metadata Ingestion Options](#metadata-ingestion-options)
* [Troubleshooting](/v1.12.x/connectors/pipeline/glue-pipeline/troubleshooting)
  * [Workflow Deployment Error](#workflow-deployment-error)

## Requirements

### KafkaConnect Versions

OpenMetadata is integrated with kafkaconnect up to version [3.6.1](https://docs.kafkaconnect.io/getting-started) and will continue to work for future kafkaconnect versions.
The ingestion framework uses [kafkaconnect python client](https://libraries.io/pypi/kafka-connect-py) to connect to the kafkaconnect instance and perform the API calls

## Metadata Ingestion

<MetadataIngestionUi connector={"KafkaConnect"} selectServicePath={"/public/images/connectors/kafkaconnect/select-service.png"} addNewServicePath={"/public/images/connectors/kafkaconnect/add-new-service.png"} serviceConnectionPath={"/public/images/connectors/kafkaconnect/service-connection.png"} />

# Connection Details

<Steps>
  <Step title="Connection Details">
    <Tip>
      When using a **Hybrid Ingestion Runner**, any sensitive credential fields—such as passwords, API keys, or private keys—must reference secrets using the following format:

      ```
      password: secret:/my/database/password
      ```

      This applies **only to fields marked as secrets** in the connection form (these typically mask input and show a visibility toggle icon).
      For a complete guide on managing secrets in hybrid setups, see the [Hybrid Ingestion Runner Secret Management Guide](https://docs.getcollate.io/getting-started/day-1/hybrid-saas/hybrid-ingestion-runner#3.-manage-secrets-securely).
    </Tip>

    * **Host and Port**: The hostname or IP address of the Kafka Connect worker with the REST API enabled eg.`https://localhost:8083` or `https://127.0.0.1:8083` or `https://<yourkafkaconnectresthostnamehere>`
    * **Kafka Connect Config**: OpenMetadata supports username/password.
      1. Basic Authentication
         * Username: Username to connect to Kafka Connect. This user should be able to send request to the Kafka Connect API and access the [Rest API](https://docs.confluent.io/platform/current/connect/references/restapi.html) GET endpoints.
         * Password: Password to connect to Kafka Connect.
    * **verifySSL** : Whether SSL verification should be perform when authenticating.
    * **Kafka Service Name** : The Service Name of the Ingested [Kafka](/v1.12.x/connectors/messaging/kafka#4.-name-and-describe-your-service) instance associated with this KafkaConnect instance.
  </Step>

  <Step title="Test the Connection">
    Once the credentials have been added, click on *Test Connection* and *Save* the changes.

    <img src="https://mintcdn.com/openmetadata/9G75p72jJKYgvFUQ/public/images/connectors/test-connection.png?fit=max&auto=format&n=9G75p72jJKYgvFUQ&q=85&s=4ac71a56e30fa3dd1be86f82c1f07068" alt="Test Connection" width="1494" height="310" data-path="public/images/connectors/test-connection.png" />
  </Step>

  <Step title="Configure Metadata Ingestion">
    In this step we will configure the metadata ingestion pipeline,
    Please follow the instructions below

    <img src="https://mintcdn.com/openmetadata/9SXjaLbGROaofLQU/public/images/connectors/configure-metadata-ingestion-pipeline.png?fit=max&auto=format&n=9SXjaLbGROaofLQU&q=85&s=07da52ea4660beae30246814bb1c55c2" alt="Configure Metadata Ingestion" width="1508" height="1614" data-path="public/images/connectors/configure-metadata-ingestion-pipeline.png" />

    #### Metadata Ingestion Options

    * **Name**: This field refers to the name of ingestion pipeline, you can customize the name or use the generated name.
    * **Pipeline Filter Pattern (Optional)**: Use to pipeline filter patterns to control whether or not to include pipeline as part of metadata ingestion.
      * **Include**: Explicitly include pipeline by adding a list of comma-separated regular expressions to the Include field. OpenMetadata will include all pipeline with names matching one or more of the supplied regular expressions. All other schemas will be excluded.
      * **Exclude**: Explicitly exclude pipeline by adding a list of comma-separated regular expressions to the Exclude field. OpenMetadata will exclude all pipeline with names matching one or more of the supplied regular expressions. All other schemas will be included.
    * **Include lineage (toggle)**: Set the Include lineage toggle to control whether to include lineage between pipelines and data sources as part of metadata ingestion.
    * **Enable Debug Log (toggle)**: Set the Enable Debug Log toggle to set the default log level to debug.
    * **Mark Deleted Pipelines (toggle)**: Set the Mark Deleted Pipelines toggle to flag pipelines as soft-deleted if they are not present anymore in the source system.
  </Step>

  <Step title="Schedule the Ingestion and Deploy">
    Scheduling can be set up at an hourly, daily, weekly, or manual cadence. The
    timezone is in UTC. Select a Start Date to schedule for ingestion. It is
    optional to add an End Date.

    Review your configuration settings. If they match what you intended,
    click Deploy to create the service and schedule metadata ingestion.

    If something doesn't look right, click the Back button to return to the
    appropriate step and change the settings as needed.

    After configuring the workflow, you can click on Deploy to create the
    pipeline.

    <img src="https://mintcdn.com/openmetadata/j50Bw6ZBiFbbFFnF/public/images/connectors/schedule.png?fit=max&auto=format&n=j50Bw6ZBiFbbFFnF&q=85&s=24b0c2f55f803efde5fb3b3bc24ed3ae" alt="Schedule the Workflow" width="2733" height="1083" data-path="public/images/connectors/schedule.png" />
  </Step>

  <Step title="View the Ingestion Pipeline">
    Once the workflow has been successfully deployed, you can view the
    Ingestion Pipeline running from the Service Page.

    <img src="https://mintcdn.com/openmetadata/9G75p72jJKYgvFUQ/public/images/connectors/view-ingestion-pipeline.png?fit=max&auto=format&n=9G75p72jJKYgvFUQ&q=85&s=7c4e411977371617cb1312efb9f9bfee" alt="View Ingestion Pipeline" width="2733" height="1271" data-path="public/images/connectors/view-ingestion-pipeline.png" />

    <Tip>
      If AutoPilot is enabled, workflows like usage tracking, data lineage, and similar tasks will be handled automatically. Users don’t need to set up or manage them - AutoPilot takes care of everything in the system.
    </Tip>
  </Step>
</Steps>

## Debezium CDC Support

The KafkaConnect connector provides **full support for Debezium CDC connectors** with intelligent column extraction and accurate lineage tracking.

### What We Provide

When you ingest Debezium connectors, OpenMetadata automatically:

1. **Detects CDC Envelope Structures** - Identifies Debezium's CDC format with `op`, `before`, and `after` fields
2. **Extracts Real Table Columns** - Parses actual database columns from the CDC payload instead of CDC envelope metadata
3. **Creates Accurate Column-Level Lineage** - Maps lineage from source database tables → Kafka topics → target systems

### Recognized Configuration Parameters

OpenMetadata recognizes the following Debezium configuration parameters for intelligent CDC detection:

* `database.server.name` - Server identifier (Debezium V1)
* `topic.prefix` - Topic prefix (Debezium V2)
* `table.include.list` - Tables to capture (e.g., `mydb.customers,mydb.orders`)

<img src="https://mintcdn.com/openmetadata/op1zUgmsxrC0rdlj/public/images/connectors/kafkaconnect/lineage.png?fit=max&auto=format&n=op1zUgmsxrC0rdlj&q=85&s=aaeae2932d4b1cfcbeac26ed180a32ff" alt="Kafkaconnect Lineage" width="2570" height="722" data-path="public/images/connectors/kafkaconnect/lineage.png" />

## Supported Connectors

Currently, the following source and sink connectors for Kafka Connect are supported for lineage tracking:

* [MySQL](/v1.12.x/connectors/database/mysql)
* [PostgreSQL](/v1.12.x/connectors/database/postgres)
* [MSSQL](/v1.12.x/connectors/database/mssql)
* [MongoDB](/v1.12.x/connectors/database/mongodb)
* [Amazon S3](/v1.12.x/connectors/storage/s3)
  For these connectors, lineage information can be obtained provided they are configured with a source or sink and the corresponding metadata ingestion is enabled.
  **Note:** All supported database connectors listed above work seamlessly with **Debezium CDC connectors** for enhanced column-level lineage tracking. When using Debezium, OpenMetadata automatically detects the CDC envelope structure and extracts actual table columns for accurate lineage mapping.

### Missing Lineage

If lineage information is not displayed for a Kafka Connect service, follow these steps to diagnose the issue.

1. *Kafka Service Association*: Make sure the Kafka service that the data is being ingested from is associated with this Kafka Connect service. Additionally, verify that the correct name is passed on in the Kafka Service Name field during configuration. This field helps establish the lineage between the Kafka service and the Kafka Connect flow.
2. *Source and Sink Configuration*: Verify that the Kafka Connect connector associated with the service is configured with a source and/or sink database or storage system. Connectors without a defined source or sink cannot provide lineage data.
3. *Metadata Ingestion*: Ensure that metadata for both the source and sink database/storage systems is ingested and passed to the lineage system. This typically involves configuring the relevant connectors to capture and transmit this information.
