> ## Documentation Index
> Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Run the Kinesis Connector Externally

> Use YAML to configure ingestion from Kinesis streams and extract topic, schema, and lineage metadata.

export const CodePanel = ({children, fileName = 'config.yaml', showLineNumbers = false}) => {
  const codePanelRef = useRef(null);
  const codeContentRef = useRef(null);
  const isProgrammaticScroll = useRef(false);
  const hoverTimeout = useRef(null);
  useEffect(() => {
    let tries = 0;
    const wrapLines = () => {
      const root = codeContentRef.current;
      if (!root) return;
      const pres = Array.from(root.querySelectorAll('pre'));
      if (!pres.length) {
        if (tries++ < 20) requestAnimationFrame(wrapLines);
        return;
      }
      let globalLine = 1;
      pres.forEach(pre => {
        const code = pre.querySelector('code') || pre;
        if (!code || code.dataset.wrapped === 'true') return;
        const raw = code.textContent || '';
        let lines = raw.split('\n');
        while (lines[0] === '') lines.shift();
        while (lines[lines.length - 1] === '') lines.pop();
        code.innerHTML = lines.map(line => {
          const ln = globalLine++;
          const num = showLineNumbers ? `<span class="line-number">${ln}</span>` : '';
          const safe = line.replace(/</g, '&lt;').replace(/>/g, '&gt;') || ' ';
          return `<span class="code-line" data-line="${ln}">${num}${safe}</span>`;
        }).join('');
        code.dataset.wrapped = 'true';
      });
    };
    wrapLines();
  }, [children, showLineNumbers]);
  useEffect(() => {
    const panel = codePanelRef.current;
    const content = codeContentRef.current;
    if (!panel || !content) return;
    const waitForLines = () => {
      const codeLines = content.querySelectorAll('.code-line');
      if (!codeLines.length) {
        requestAnimationFrame(waitForLines);
        return;
      }
      setupHighlighting(codeLines);
    };
    const setupHighlighting = codeLines => {
      const layout = panel.closest('.split-layout');
      const sections = layout.querySelectorAll('.content-section');
      const parseLines = str => {
        if (!str) return [];
        const out = [];
        str.split(',').forEach(p => {
          if (p.includes('-')) {
            const [s, e] = p.split('-').map(Number);
            for (let i = s; i <= e; i++) out.push(i);
          } else {
            const n = Number(p);
            if (!isNaN(n)) out.push(n);
          }
        });
        return out;
      };
      const clearHighlight = () => {
        codeLines.forEach(l => l.classList.remove('highlighted'));
      };
      const highlight = lines => {
        clearHighlight();
        lines.forEach(n => {
          const el = content.querySelector(`.code-line[data-line="${n}"]`);
          if (el) el.classList.add('highlighted');
        });
      };
      const scrollToLines = lines => {
        if (!lines.length) return;
        const first = lines[0];
        const targetLine = lines.length > 1 ? first : lines[0];
        const el = content.querySelector(`.code-line[data-line="${targetLine}"]`);
        if (!el) return;
        isProgrammaticScroll.current = true;
        const containerRect = content.getBoundingClientRect();
        const elRect = el.getBoundingClientRect();
        const offset = elRect.top - containerRect.top + content.scrollTop;
        const TOP_PADDING = 16;
        content.scrollTo({
          top: Math.max(offset - TOP_PADDING, 0),
          behavior: 'smooth'
        });
        setTimeout(() => {
          isProgrammaticScroll.current = false;
        }, 200);
      };
      const activate = (section, scroll) => {
        if (section.classList.contains('active')) return;
        sections.forEach(s => s.classList.remove('active'));
        section.classList.add('active');
        const lines = parseLines(section.dataset.lines);
        highlight(lines);
        if (scroll) scrollToLines(lines);
      };
      const observer = new IntersectionObserver(entries => {
        if (isProgrammaticScroll.current) return;
        entries.forEach(e => {
          if (e.isIntersecting) activate(e.target, false);
        });
      }, {
        threshold: 0.3,
        rootMargin: '-80px 0px -40% 0px'
      });
      sections.forEach(section => {
        observer.observe(section);
        section.addEventListener('click', () => activate(section, true));
        section.addEventListener('mouseenter', () => {
          clearTimeout(hoverTimeout.current);
          hoverTimeout.current = setTimeout(() => activate(section, true), 80);
        });
      });
      if (sections[0]) activate(sections[0], false);
    };
    waitForLines();
  }, []);
  const handleCopy = e => {
    const btn = e.currentTarget;
    const codeLines = codeContentRef.current?.querySelectorAll('.code-line');
    if (!codeLines || codeLines.length === 0) return;
    const text = Array.from(codeLines).map(line => {
      const clone = line.cloneNode(true);
      const lineNumber = clone.querySelector('.line-number');
      if (lineNumber) lineNumber.remove();
      return clone.textContent;
    }).join('\n');
    if (!text) return;
    navigator.clipboard.writeText(text).then(() => {
      btn.dataset.copied = 'true';
      setTimeout(() => btn.dataset.copied = 'false', 1500);
    });
  };
  return <div className="code-panel" ref={codePanelRef}>
      <div className="code-header">
        {fileName}
        <button className="copy-btn" aria-label="Copy full code" data-copied="false" onClick={handleCopy}>
          <svg className="icon-copy" viewBox="0 0 15 16" fill="currentColor">
            <path d="M10.113 3.124H2.205C1.463 3.124.86 3.655.86 4.31v10.005c0 .654.603 1.186 1.345 1.186h7.908c.742 0 1.345-.532 1.345-1.186V4.31c0-.655-.606-1.186-1.345-1.186Z" />
            <path d="M13.138.5H5.229c-.742 0-1.344.531-1.344 1.186 0 .23.209.414.47.414s.47-.184.47-.414c0-.197.182-.357.404-.357h7.909c.223 0 .404.16.404.357V11.69c0 .196-.181.356-.404.356-.262 0-.47.184-.47.415 0 .23.208.415.47.415.742 0 1.344-.532 1.344-1.186V1.686C14.482 1.03 13.88.5 13.138.5Z" />
          </svg>

          <svg className="icon-check" viewBox="0 0 20 20" fill="currentColor">
            <path fillRule="evenodd" d="M16.707 5.293a1 1 0 010 1.414l-7.25 7.25a1 1 0 01-1.414 0l-3.25-3.25a1 1 0 011.414-1.414l2.543 2.543 6.543-6.543a1 1 0 011.414 0z" clipRule="evenodd" />
          </svg>
        </button>
      </div>

      <div className="code-content" ref={codeContentRef}>
        {children}
      </div>
    </div>;
};

export const ContentSection = ({id, title, lines, children}) => <div className="content-section" data-content-id={id} data-lines={lines}>
    {title && <h4>{title}</h4>}
    {children}
  </div>;

export const ContentPanel = ({children}) => <div className="content-panel">{children}</div>;

export const CodePreview = ({children}) => {
  const [instanceId] = useState(() => `preview-${Math.random().toString(36).slice(2)}`);
  useEffect(() => {
    const nav = document.querySelector('nav') || document.querySelector('header') || document.querySelector('[class*="nav"]');
    if (nav) {
      document.documentElement.style.setProperty('--navbar-height', `${nav.offsetHeight}px`);
    }
  }, []);
  return <div className="split-layout" data-preview-id={instanceId}>
      {children}
    </div>;
};

export const ConnectorDetailsHeader = ({name, icon, stage, availableFeatures, unavailableFeatures = [], availableFeaturesCollate = []}) => {
  const showSubHeading = availableFeatures?.length > 0 || unavailableFeatures?.length > 0 || availableFeaturesCollate?.length > 0;
  const totalAvailableFeatures = [...availableFeatures || [], ...availableFeaturesCollate || []];
  return <div className="container">
      <div className="Heading">
        <div className="flex items-center gap-3">
          {icon && <div className="IconContainer">
              <img src={icon} alt={name} noZoom className="ConnectorIcon" />
            </div>}
          <h1 className="ConnectorName">{name}</h1>
          <span className={`StageBadge ${stage === 'PROD' ? 'prod' : 'beta'}`}>
            {stage}
          </span>
        </div>
      </div>
      {showSubHeading && <div className="SubHeading">
          <div className="FeaturesHeading">Feature List</div>
          <div className="FeaturesList">
            {totalAvailableFeatures.map(feature => <div className="FeatureTag AvailableFeature" key={feature}>
                ✓ {feature}
              </div>)}
            {unavailableFeatures.map(feature => <div className="FeatureTag UnavailableFeature" key={feature}>
                ✕ {feature}
              </div>)}
          </div>
        </div>}
    </div>;
};

<ConnectorDetailsHeader icon="/public/images/connectors/kinesis.webp" name="Kinesis" stage="PROD" availableFeatures={["Topics", "Sample Data"]} unavailableFeatures={[]} />

In this section, we provide guides and references to use the Kinesis connector.
Configure and schedule Kinesis metadata workflows from the OpenMetadata UI:

* [Requirements](#requirements)
* [Metadata Ingestion](#metadata-ingestion)

## How to Run the Connector Externally

To run the Ingestion via the UI you'll need to use the OpenMetadata Ingestion Container, which comes shipped with
custom Airflow plugins to handle the workflow deployment.

If, instead, you want to manage your workflows externally on your preferred orchestrator, you can check
the following docs to run the Ingestion Framework **anywhere**.

<Columns cols={2}>
  <Card title="External Schedulers" href="/v1.12.x/deployment/ingestion">
    Get more information about running the Ingestion Framework Externally
  </Card>
</Columns>

## Requirements

OpenMetadata retrieves information about streams and sample data from the streams in the AWS account.
The user must have the following policy set to access the metadata from Kinesis.

```json theme={null}
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "KinesisPolicy",
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": "*"
        }
    ]
}
```

For more information on Kinesis permissions visit the [AWS Kinesis official documentation](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).

### Python Requirements

<Tip>
  We have support for Python versions **3.9-3.11**
</Tip>

To run the Kinesis ingestion, you will need to install:

```bash theme={null}
pip3 install "openmetadata-ingestion[kinesis]"
```

## Metadata Ingestion

All connectors are defined as JSON Schemas.
[Here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kinesisConnection.json)
you can find the structure to create a connection to Kinesis.
In order to create and run a Metadata Ingestion workflow, we will follow
the steps to create a YAML configuration able to connect to the source,
process the Entities if needed, and reach the OpenMetadata server.
The workflow is modeled around the following
[JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json)

### 1. Define the YAML Config

This is a sample config for Kinesis:

#### Source Configuration - Service Connection

<CodePreview>
  <ContentPanel>
    <ContentSection id={3} title="AWS Configuration" lines="7-16">
      AWS Access Key Credentials

      **awsAccessKeyId** and **awsSecretAccessKey** are used to authenticate and authorize programmatic requests to AWS services.

      An access key consists of:

      * **Access Key ID** (for example, `AKIAIOSFODNN7EXAMPLE`)
      * **Secret Access Key** (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`)

      Both values must be provided together when using static credentials.

      For more information, see
      [Managing access keys](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html).

      AWS Session Token

      **awsSessionToken** is required when using **temporary security credentials**, such as those obtained via AWS STS.

      The session token must be provided along with the access key ID and secret access key for the duration of the session.

      AWS Region

      **awsRegion** specifies the AWS Region where the target service is deployed (for example, `us-east-1`).

      This is the **only required parameter** when configuring an AWS connection. Other credentials can be resolved automatically using environment variables, AWS profiles, or IAM roles.

      Learn more in the
      [AWS Regions and Availability Zones documentation](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html).

      Custom Endpoint URL

      **endPointURL** is an optional custom endpoint used to connect to an AWS service.

      You may want to specify this when:

      * Using VPC endpoints
      * Connecting to local or AWS-compatible services
      * Overriding the default regional endpoint

      See
      [AWS service endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html) for details.

      AWS Profile Name

      **profileName** specifies the AWS CLI profile to use for authentication.

      Profiles store credentials and configuration in AWS config files.
      If not specified, the `default` profile is used.

      Learn more about
      [Named profiles for the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html).

      Assume Role ARN

      **assumeRoleArn** is the Amazon Resource Name (ARN) of the IAM role to assume.

      This is commonly used for:

      * Cross-account access
      * Delegated permissions
      * Enhanced security setups

      This field is **required** when using Assume Role authentication.

      See the
      [AssumeRole API reference](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html).

      Assume Role Session Name

      **assumeRoleSessionName** identifies the assumed role session.

      This value helps uniquely identify a session when the same role is assumed multiple times or by different principals.

      If not provided, the default value `OpenMetadataSession` is used.

      Assume Role Source Identity

      **assumeRoleSourceIdentity** is an optional source identity passed when assuming a role.

      This value is recorded in AWS CloudTrail logs and can be used to trace actions performed using the assumed role.

      See
      [Source Identity in AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html).
    </ContentSection>

    <ContentSection id={2} title="Source Configuration" lines="17-28">
      #### Source Configuration - Source Config

      The sourceConfig is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/messagingServiceMetadataPipeline.json):

      <div>
        **generateSampleData:** Option to turn on/off generating sample data during metadata extraction.
      </div>

      <div>
        **topicFilterPattern:** Note that the `topicFilterPattern` supports regex as include or exclude.
      </div>

      <div>
        **generateSampleData:** Option to turn on/off generating sample data during metadata extraction. `generateSampleData` supports boolean value either `true` or `false`.
      </div>

      <div>
        **markDeletedTopics:** Optional configuration to soft delete topics in OpenMetadata if the source topics are deleted. Also, if the topic is deleted, all the associated entities like sample data, lineage, etc., with that topic will be deleted. `markDeletedTopics` supports boolean value either `true` or `false`.
      </div>

      <div>
        **overrideMetadata:** Set the 'Override Metadata' toggle to control whether to override the existing metadata in the OpenMetadata server with the metadata fetched from the source. If the toggle is set to true, the metadata fetched from the source will override the existing metadata in the OpenMetadata server. If the toggle is set to false, the metadata fetched from the source will not override the existing metadata in the OpenMetadata server. This is applicable for fields like description, tags, owner and displayName. `overrideMetadata` supports boolean value either `true` or `false`.
      </div>
    </ContentSection>

    To send the metadata to OpenMetadata, it needs to be specified as `type: metadata-rest`.

    <ContentSection id={1} title="Workflow Configuration" lines="32-48">
      <div>
        The main property here is the `openMetadataServerConfig`, where you can define the host and security provider of your OpenMetadata installation.
      </div>

      <div>
        **Logger Level**

        You can specify the `loggerLevel` depending on your needs. If you are trying to troubleshoot an ingestion, running with `DEBUG` will give you far more traces for identifying issues.
      </div>

      <div>
        **JWT Token**

        JWT tokens will allow your clients to authenticate against the OpenMetadata server. To enable JWT Tokens, you will get more details [here](/deployment/security/enable-jwt-tokens).

        You can refer to the JWT Troubleshooting section [link](/deployment/security/jwt-troubleshooting) for any issues in your JWT configuration.
      </div>

      <div>
        **Store Service Connection**

        If set to `true` (default), we will store the sensitive information either encrypted via the Fernet Key in the database or externally, if you have configured any [Secrets Manager](/deployment/secrets-manager).

        If set to `false`, the service will be created, but the service connection information will only be used by the Ingestion Framework at runtime, and won't be sent to the OpenMetadata server.
      </div>

      <div>
        **SSL Configuration**

        If you have added SSL to the [OpenMetadata server](/deployment/security/enable-ssl), then you will need to handle the certificates when running the ingestion too. You can either set `verifySSL` to `ignore`, or have it as `validate`, which will require you to set the `sslConfig.caCertificate` with a local path where your ingestion runs that points to the server certificate file.

        Find more information on how to troubleshoot SSL issues [here](/deployment/security/enable-ssl/ssl-troubleshooting).
      </div>

      <div>
        **ingestionPipelineFQN**

        Fully qualified name of ingestion pipeline, used to identify the current ingestion pipeline.
      </div>
    </ContentSection>

    <ContentSection id={1} title="Source Configuration" lines="1-3">
      Configure the source type and service name.
    </ContentSection>

    <ContentSection id={2} title="AWS Configuration" lines="7">
      Configure AWS credentials and settings for accessing Kinesis streams.
    </ContentSection>
  </ContentPanel>

  <CodePanel fileName="connector_config.yaml">
    ```yaml theme={null}
    source:
      type: kinesis
      serviceName: local_kinesis
      serviceConnection:
        config:
          type: Kinesis
          awsConfig:
    ```

    ```yaml theme={null}
             awsAccessKeyId: KEY
             awsSecretAccessKey: SECRET
    ```

    ```yaml theme={null}
            # awsSessionToken: TOKEN
    ```

    ```yaml theme={null}
             awsRegion: us-east-2
    ```

    ```yaml theme={null}
            # endPointURL: https://athena.us-east-2.amazonaws.com/custom
    ```

    ```yaml theme={null}
            # profileName: profile
    ```

    ```yaml theme={null}
            # assumeRoleArn: "arn:partition:service:region:account:resource"
    ```

    ```yaml theme={null}
            # assumeRoleSessionName: session
    ```

    ```yaml theme={null}
            # assumeRoleSourceIdentity: identity
    ```

    ```yaml theme={null}
      sourceConfig:
        config:
          type: MessagingMetadata
          topicFilterPattern:
            excludes:
              - _confluent.*
          # includes:
          #   - topic1
          # generateSampleData: true
          # generateSampleData: false # true
          # markDeletedTopics: true # false
          # overrideMetadata:  false # true

    ```

    ```yaml theme={null}
    sink:
      type: metadata-rest
      config: {}
    ```

    ```yaml theme={null}
    workflowConfig:
      loggerLevel: INFO  # DEBUG, INFO, WARNING or ERROR
      openMetadataServerConfig:
        hostPort: "http://localhost:8585/api"
        authProvider: openmetadata
        securityConfig:
          jwtToken: "{bot_jwt_token}"
        ## Store the service Connection information
        storeServiceConnection: true  # false
        ## Secrets Manager Configuration
        # secretsManagerProvider: aws, azure or noop
        # secretsManagerLoader: airflow or env
        ## If SSL, fill the following
        # verifySSL: validate  # or ignore
        # sslConfig:
        #   caCertificate: /local/path/to/certificate
    # ingestionPipelineFQN: <service name>.<ingestion name> ## e.g., "my_redshift.metadata"
    ```
  </CodePanel>
</CodePreview>

### 2. Run with the CLI

First, we will need to save the YAML file. Afterward, and with all requirements installed, we can run:

```bash theme={null}
metadata ingest -c <path-to-yaml>
```

Note that from connector to connector, this recipe will always be the same. By updating the YAML configuration,
you will be able to extract metadata from different sources.
