> ## Documentation Index
> Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Run the Kafka Connector Externally

> Use YAML to configure Kafka ingestion and capture topic metadata, message structure, and event lineage.

export const CodePanel = ({children, fileName = 'config.yaml', showLineNumbers = false}) => {
  const codePanelRef = useRef(null);
  const codeContentRef = useRef(null);
  const isProgrammaticScroll = useRef(false);
  const hoverTimeout = useRef(null);
  useEffect(() => {
    let tries = 0;
    const wrapLines = () => {
      const root = codeContentRef.current;
      if (!root) return;
      const pres = Array.from(root.querySelectorAll('pre'));
      if (!pres.length) {
        if (tries++ < 20) requestAnimationFrame(wrapLines);
        return;
      }
      let globalLine = 1;
      pres.forEach(pre => {
        const code = pre.querySelector('code') || pre;
        if (!code || code.dataset.wrapped === 'true') return;
        const raw = code.textContent || '';
        let lines = raw.split('\n');
        while (lines[0] === '') lines.shift();
        while (lines[lines.length - 1] === '') lines.pop();
        code.innerHTML = lines.map(line => {
          const ln = globalLine++;
          const num = showLineNumbers ? `<span class="line-number">${ln}</span>` : '';
          const safe = line.replace(/</g, '&lt;').replace(/>/g, '&gt;') || ' ';
          return `<span class="code-line" data-line="${ln}">${num}${safe}</span>`;
        }).join('');
        code.dataset.wrapped = 'true';
      });
    };
    wrapLines();
  }, [children, showLineNumbers]);
  useEffect(() => {
    const panel = codePanelRef.current;
    const content = codeContentRef.current;
    if (!panel || !content) return;
    const waitForLines = () => {
      const codeLines = content.querySelectorAll('.code-line');
      if (!codeLines.length) {
        requestAnimationFrame(waitForLines);
        return;
      }
      setupHighlighting(codeLines);
    };
    const setupHighlighting = codeLines => {
      const layout = panel.closest('.split-layout');
      const sections = layout.querySelectorAll('.content-section');
      const parseLines = str => {
        if (!str) return [];
        const out = [];
        str.split(',').forEach(p => {
          if (p.includes('-')) {
            const [s, e] = p.split('-').map(Number);
            for (let i = s; i <= e; i++) out.push(i);
          } else {
            const n = Number(p);
            if (!isNaN(n)) out.push(n);
          }
        });
        return out;
      };
      const clearHighlight = () => {
        codeLines.forEach(l => l.classList.remove('highlighted'));
      };
      const highlight = lines => {
        clearHighlight();
        lines.forEach(n => {
          const el = content.querySelector(`.code-line[data-line="${n}"]`);
          if (el) el.classList.add('highlighted');
        });
      };
      const scrollToLines = lines => {
        if (!lines.length) return;
        const first = lines[0];
        const targetLine = lines.length > 1 ? first : lines[0];
        const el = content.querySelector(`.code-line[data-line="${targetLine}"]`);
        if (!el) return;
        isProgrammaticScroll.current = true;
        const containerRect = content.getBoundingClientRect();
        const elRect = el.getBoundingClientRect();
        const offset = elRect.top - containerRect.top + content.scrollTop;
        const TOP_PADDING = 16;
        content.scrollTo({
          top: Math.max(offset - TOP_PADDING, 0),
          behavior: 'smooth'
        });
        setTimeout(() => {
          isProgrammaticScroll.current = false;
        }, 200);
      };
      const activate = (section, scroll) => {
        if (section.classList.contains('active')) return;
        sections.forEach(s => s.classList.remove('active'));
        section.classList.add('active');
        const lines = parseLines(section.dataset.lines);
        highlight(lines);
        if (scroll) scrollToLines(lines);
      };
      const observer = new IntersectionObserver(entries => {
        if (isProgrammaticScroll.current) return;
        entries.forEach(e => {
          if (e.isIntersecting) activate(e.target, false);
        });
      }, {
        threshold: 0.3,
        rootMargin: '-80px 0px -40% 0px'
      });
      sections.forEach(section => {
        observer.observe(section);
        section.addEventListener('click', () => activate(section, true));
        section.addEventListener('mouseenter', () => {
          clearTimeout(hoverTimeout.current);
          hoverTimeout.current = setTimeout(() => activate(section, true), 80);
        });
      });
      if (sections[0]) activate(sections[0], false);
    };
    waitForLines();
  }, []);
  const handleCopy = e => {
    const btn = e.currentTarget;
    const codeLines = codeContentRef.current?.querySelectorAll('.code-line');
    if (!codeLines || codeLines.length === 0) return;
    const text = Array.from(codeLines).map(line => {
      const clone = line.cloneNode(true);
      const lineNumber = clone.querySelector('.line-number');
      if (lineNumber) lineNumber.remove();
      return clone.textContent;
    }).join('\n');
    if (!text) return;
    navigator.clipboard.writeText(text).then(() => {
      btn.dataset.copied = 'true';
      setTimeout(() => btn.dataset.copied = 'false', 1500);
    });
  };
  return <div className="code-panel" ref={codePanelRef}>
      <div className="code-header">
        {fileName}
        <button className="copy-btn" aria-label="Copy full code" data-copied="false" onClick={handleCopy}>
          <svg className="icon-copy" viewBox="0 0 15 16" fill="currentColor">
            <path d="M10.113 3.124H2.205C1.463 3.124.86 3.655.86 4.31v10.005c0 .654.603 1.186 1.345 1.186h7.908c.742 0 1.345-.532 1.345-1.186V4.31c0-.655-.606-1.186-1.345-1.186Z" />
            <path d="M13.138.5H5.229c-.742 0-1.344.531-1.344 1.186 0 .23.209.414.47.414s.47-.184.47-.414c0-.197.182-.357.404-.357h7.909c.223 0 .404.16.404.357V11.69c0 .196-.181.356-.404.356-.262 0-.47.184-.47.415 0 .23.208.415.47.415.742 0 1.344-.532 1.344-1.186V1.686C14.482 1.03 13.88.5 13.138.5Z" />
          </svg>

          <svg className="icon-check" viewBox="0 0 20 20" fill="currentColor">
            <path fillRule="evenodd" d="M16.707 5.293a1 1 0 010 1.414l-7.25 7.25a1 1 0 01-1.414 0l-3.25-3.25a1 1 0 011.414-1.414l2.543 2.543 6.543-6.543a1 1 0 011.414 0z" clipRule="evenodd" />
          </svg>
        </button>
      </div>

      <div className="code-content" ref={codeContentRef}>
        {children}
      </div>
    </div>;
};

export const ContentSection = ({id, title, lines, children}) => <div className="content-section" data-content-id={id} data-lines={lines}>
    {title && <h4>{title}</h4>}
    {children}
  </div>;

export const ContentPanel = ({children}) => <div className="content-panel">{children}</div>;

export const CodePreview = ({children}) => {
  const [instanceId] = useState(() => `preview-${Math.random().toString(36).slice(2)}`);
  useEffect(() => {
    const nav = document.querySelector('nav') || document.querySelector('header') || document.querySelector('[class*="nav"]');
    if (nav) {
      document.documentElement.style.setProperty('--navbar-height', `${nav.offsetHeight}px`);
    }
  }, []);
  return <div className="split-layout" data-preview-id={instanceId}>
      {children}
    </div>;
};

export const ConnectorDetailsHeader = ({name, icon, stage, availableFeatures, unavailableFeatures = [], availableFeaturesCollate = []}) => {
  const showSubHeading = availableFeatures?.length > 0 || unavailableFeatures?.length > 0 || availableFeaturesCollate?.length > 0;
  const totalAvailableFeatures = [...availableFeatures || [], ...availableFeaturesCollate || []];
  return <div className="container">
      <div className="Heading">
        <div className="flex items-center gap-3">
          {icon && <div className="IconContainer">
              <img src={icon} alt={name} noZoom className="ConnectorIcon" />
            </div>}
          <h1 className="ConnectorName">{name}</h1>
          <span className={`StageBadge ${stage === 'PROD' ? 'prod' : 'beta'}`}>
            {stage}
          </span>
        </div>
      </div>
      {showSubHeading && <div className="SubHeading">
          <div className="FeaturesHeading">Feature List</div>
          <div className="FeaturesList">
            {totalAvailableFeatures.map(feature => <div className="FeatureTag AvailableFeature" key={feature}>
                ✓ {feature}
              </div>)}
            {unavailableFeatures.map(feature => <div className="FeatureTag UnavailableFeature" key={feature}>
                ✕ {feature}
              </div>)}
          </div>
        </div>}
    </div>;
};

<ConnectorDetailsHeader icon="/public/images/connectors/kafka.webp" name="Kafka" stage="PROD" availableFeatures={["Topics", "Sample Data"]} unavailableFeatures={[]} />

In this section, we provide guides and references to use the Kafka connector.
Configure and schedule Kafka metadata and profiler workflows from the OpenMetadata UI:

* [Requirements](#requirements)
* [Metadata Ingestion](#metadata-ingestion)
* [Enable Security](#securing-kafka-connection-with-ssl-in-openmetadata)

## How to Run the Connector Externally

To run the Ingestion via the UI you'll need to use the OpenMetadata Ingestion Container, which comes shipped with
custom Airflow plugins to handle the workflow deployment.

If, instead, you want to manage your workflows externally on your preferred orchestrator, you can check
the following docs to run the Ingestion Framework **anywhere**.

<Columns cols={2}>
  <Card title="External Schedulers" href="/v1.12.x/deployment/ingestion">
    Get more information about running the Ingestion Framework Externally
  </Card>
</Columns>

## Requirements

### Python Requirements

<Tip>
  We have support for Python versions **3.9-3.11**
</Tip>

To run the Kafka ingestion, you will need to install:

```bash theme={null}
pip3 install "openmetadata-ingestion[kafka]"
```

## Metadata Ingestion

All connectors are defined as JSON Schemas.
[Here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json)
you can find the structure to create a connection to Kafka.
In order to create and run a Metadata Ingestion workflow, we will follow
the steps to create a YAML configuration able to connect to the source,
process the Entities if needed, and reach the OpenMetadata server.
The workflow is modeled around the following
[JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json)

### 1. Define the YAML Config

This is a sample config for Kafka:

<CodePreview>
  <ContentPanel>
    #### Source Configuration - Source Config

    The sourceConfig is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/messagingServiceMetadataPipeline.json):

    <div>
      **generateSampleData:** Option to turn on/off generating sample data during metadata extraction.
    </div>

    <div>
      **topicFilterPattern:** Note that the `topicFilterPattern` supports regex as include or exclude.
    </div>

    <div>
      **generateSampleData:** Option to turn on/off generating sample data during metadata extraction. `generateSampleData` supports boolean value either `true` or `false`.
    </div>

    <div>
      **markDeletedTopics:** Optional configuration to soft delete topics in OpenMetadata if the source topics are deleted. Also, if the topic is deleted, all the associated entities like sample data, lineage, etc., with that topic will be deleted. `markDeletedTopics` supports boolean value either `true` or `false`.
    </div>

    <div>
      **overrideMetadata:** Set the 'Override Metadata' toggle to control whether to override the existing metadata in the OpenMetadata server with the metadata fetched from the source. If the toggle is set to true, the metadata fetched from the source will override the existing metadata in the OpenMetadata server. If the toggle is set to false, the metadata fetched from the source will not override the existing metadata in the OpenMetadata server. This is applicable for fields like description, tags, owner and displayName. `overrideMetadata` supports boolean value either `true` or `false`.
    </div>

    To send the metadata to OpenMetadata, it needs to be specified as `type: metadata-rest`.

    <div>
      The main property here is the `openMetadataServerConfig`, where you can define the host and security provider of your OpenMetadata installation.
    </div>

    <div>
      **Logger Level**

      You can specify the `loggerLevel` depending on your needs. If you are trying to troubleshoot an ingestion, running with `DEBUG` will give you far more traces for identifying issues.
    </div>

    <div>
      **JWT Token**

      JWT tokens will allow your clients to authenticate against the OpenMetadata server. To enable JWT Tokens, you will get more details [here](/deployment/security/enable-jwt-tokens).

      You can refer to the JWT Troubleshooting section [link](/deployment/security/jwt-troubleshooting) for any issues in your JWT configuration.
    </div>

    <div>
      **Store Service Connection**

      If set to `true` (default), we will store the sensitive information either encrypted via the Fernet Key in the database or externally, if you have configured any [Secrets Manager](/deployment/secrets-manager).

      If set to `false`, the service will be created, but the service connection information will only be used by the Ingestion Framework at runtime, and won't be sent to the OpenMetadata server.
    </div>

    <div>
      **SSL Configuration**

      If you have added SSL to the [OpenMetadata server](/deployment/security/enable-ssl), then you will need to handle the certificates when running the ingestion too. You can either set `verifySSL` to `ignore`, or have it as `validate`, which will require you to set the `sslConfig.caCertificate` with a local path where your ingestion runs that points to the server certificate file.

      Find more information on how to troubleshoot SSL issues [here](/deployment/security/enable-ssl/ssl-troubleshooting).
    </div>

    <div>
      **ingestionPipelineFQN**

      Fully qualified name of ingestion pipeline, used to identify the current ingestion pipeline.
    </div>

    <ContentSection id={1} title="Source Configuration" lines="1-3">
      Configure the source type and service name.
    </ContentSection>

    <ContentSection id={2} title="bootstrapServers" lines="7">
      **bootstrapServers**: List of brokers as comma separated values of broker `host` or `host:port`.
      Example: `host1:7-807-82,host2:7-807-82`
    </ContentSection>

    <ContentSection id={3} title="schemaRegistryURL" lines="8">
      **schemaRegistryURL**: URL of the Schema Registry used to ingest the schemas of the topics.
      If you encounter issues connecting to the Schema Registry, ensure that the protocol is explicitly specified in the Schema Registry URL. For example:

      * Use `http://localhost:8081` instead of `localhost:8081`.
        The Schema Registry requires a properly formatted URL, including the protocol (`http://` or `https://`). While this differentiation is expected in the Schema Registry configuration, it may not be immediately apparent.
        **NOTE**: For now, the schema will be the last version found for the schema name `{topic-name}-value`. An [issue](https://github.com/open-metadata/OpenMetadata/issues/8399) to improve how it currently works has been opened.
    </ContentSection>

    <ContentSection id={4} title="saslUsername" lines="9">
      **saslUsername**: SASL username for use with the PLAIN and SASL-SCRAM mechanisms.
    </ContentSection>

    <ContentSection id={5} title="saslPassword" lines="10">
      **saslPassword**: SASL password for use with the PLAIN and SASL-SCRAM mechanisms.
    </ContentSection>

    <ContentSection id={6} title="saslMechanism" lines="11">
      **saslMechanism**: SASL mechanism to use for authentication.
      Supported: *GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER*.
      **NOTE**: Despite the name only one mechanism must be configured.
    </ContentSection>

    <ContentSection id={7} title="basicAuthUserInfo" lines="12">
      **basicAuthUserInfo**: Schema Registry Client HTTP credentials in the form of `username:password`.
      By default, user info is extracted from the URL if present.
    </ContentSection>

    <ContentSection id={8} title="consumerConfig" lines="13">
      **consumerConfig**: The accepted additional values for the consumer configuration can be found in the following
      [link](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdx).
    </ContentSection>

    <ContentSection id={9} title="schemaRegistryConfig" lines="18">
      **schemaRegistryConfig**: The accepted additional values for the Schema Registry configuration can be found in the
      following [link](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient).
      **Note:** To ingest the topic schema, `schemaRegistryURL` must be passed.
    </ContentSection>

    <ContentSection id={10} title="securityProtocol" lines="21-22">
      **securityProtocol**: security.protocol consumer config property. It accepts `PLAINTEXT`,`SASL_PLAINTEXT`, `SASL_SSL`, `SSL`.
    </ContentSection>

    <ContentSection id={11} title="schemaRegistryTopicSuffixName" lines="23">
      **schemaRegistryTopicSuffixName**: Schema Registry Topic Suffix Name. The suffix to be appended to the topic name to get topic schema from registry.
    </ContentSection>

    <ContentSection id={12} title="schemaRegistrySSL" lines="21">
      **schemaRegistrySSL**: Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry connection.
    </ContentSection>

    <ContentSection id={13} title="supportsMetadataExtraction" lines="34">
      **supportsMetadataExtraction**: Supports Metadata Extraction. `supportsMetadataExtraction` supports boolean value either true or false.
    </ContentSection>
  </ContentPanel>

  <CodePanel fileName="connector_config.yaml">
    ```yaml theme={null}
    source:
      type: kafka
      serviceName: local_kafka
      serviceConnection:
        config:
          type: Kafka
    bootstrapServers: localhost:9092  # REQUIRED - Kafka broker addresses
    schemaRegistryURL: http://localhost:8081  # Schema Registry URL
    saslUsername: username  # SASL authentication username
    saslPassword: password  # SASL authentication password
    saslMechanism: PLAIN  # SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER
    basicAuthUserInfo: username:password  # Schema Registry basic auth
    consumerConfig:
            # Example consumer configurations:
            # auto.offset.reset: "earliest"
            # max.poll.records: 500
            # session.timeout.ms: 30000
    schemaRegistryConfig:
            # Example schema registry configurations:
            # schema.registry.basic.auth.credentials.source: "USER_INFO"
            # schema.registry.ssl.truststore.location: "/path/to/truststore.jks"
    securityProtocol: PLAINTEXT  # Security protocol: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
    # schemaRegistryTopicSuffixName: -value
    # SSL Configuration for Consumer connections (when securityProtocol is SSL or SASL_SSL)
          # consumerConfigSSL:
          #   caCertificate: "/path/to/ca-cert.pem"
          #   sslCertificate: "/path/to/client-cert.pem"
          #   sslKey: "/path/to/client-key.pem"
    # SSL Configuration for Schema Registry connections
          # schemaRegistrySSL:
          #   caCertificate: "/path/to/schema-registry-ca.pem"
          #   sslCertificate: "/path/to/schema-registry-cert.pem"
          #   sslKey: "/path/to/schema-registry-key.pem"
    # supportsMetadataExtraction: true
    ```

    ```yaml theme={null}
      sourceConfig:
        config:
          type: MessagingMetadata
          topicFilterPattern:
            excludes:
              - _confluent.*
          # includes:
          #   - topic1
          # generateSampleData: true
          # generateSampleData: false # true
          # markDeletedTopics: true # false
          # overrideMetadata:  false # true

    ```

    ```yaml theme={null}
    sink:
      type: metadata-rest
      config: {}
    ```

    ```yaml theme={null}
    workflowConfig:
      loggerLevel: INFO  # DEBUG, INFO, WARNING or ERROR
      openMetadataServerConfig:
        hostPort: "http://localhost:8585/api"
        authProvider: openmetadata
        securityConfig:
          jwtToken: "{bot_jwt_token}"
        ## Store the service Connection information
        storeServiceConnection: true  # false
        ## Secrets Manager Configuration
        # secretsManagerProvider: aws, azure or noop
        # secretsManagerLoader: airflow or env
        ## If SSL, fill the following
        # verifySSL: validate  # or ignore
        # sslConfig:
        #   caCertificate: /local/path/to/certificate
    # ingestionPipelineFQN: <service name>.<ingestion name> ## e.g., "my_redshift.metadata"
    ```
  </CodePanel>
</CodePreview>

## Securing Kafka Connection with SSL in OpenMetadata

To establish secure connections between OpenMetadata and Kafka, in the `YAML` you can provide the CA certificate used for SSL validation by specifying the `caCertificate`. Alternatively, if both client and server require mutual authentication, you'll need to use all three parameters: `ssl key`, `ssl cert`, and `caCertificate`. In this case, `ssl_cert` is used for the client’s SSL certificate, `ssl_key` for the private key associated with the SSL certificate, and `caCertificate` for the CA certificate to validate the server’s certificate.

```yaml theme={null}
      sslConfig:
            caCertificate: "/path/to/ca_certificate"
            sslCertificate: "/path/to/your/ssl_cert"
            sslKey: "/path/to/your/ssl_key"
```

### 2. Run with the CLI

First, we will need to save the YAML file. Afterward, and with all requirements installed, we can run:

```bash theme={null}
metadata ingest -c <path-to-yaml>
```

Note that from connector to connector, this recipe will always be the same. By updating the YAML configuration,
you will be able to extract metadata from different sources.
