Run Datalake using the Airflow SDK

StageMetadataQuery UsageData ProfilerData QualityLineageDBTSupported Versions
PROD--
LineageTable-levelColumn-level

In this section, we provide guides and references to use the Datalake connector.

Configure and schedule Datalake metadata and profiler workflows from the OpenMetadata UI:

To run the Ingestion via the UI you'll need to use the OpenMetadata Ingestion Container, which comes shipped with custom Airflow plugins to handle the workflow deployment.

Note

Datalake connector supports extracting metadata from file types JSON, CSV, TSV & Parquet.

S3 Permissions

To execute metadata extraction AWS account should have enough access to fetch required data. The Bucket Policy in AWS requires at least these permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<my bucket>",
                "arn:aws:s3:::<my bucket>/*"
            ]
        }
    ]
}

If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3:

S3 installation

pip3 install "openmetadata-ingestion[datalake-s3]"

GCS installation

pip3 install "openmetadata-ingestion[datalake-gcs]"

Azure installation

pip3 install "openmetadata-ingestion[datalake-azure]"

If version <0.13

You will be installing the requirements together for S3 and GCS

pip3 install "openmetadata-ingestion[datalake]"

All connectors are defined as JSON Schemas. Here you can find the structure to create a connection to Datalake.

In order to create and run a Metadata Ingestion workflow, we will follow the steps to create a YAML configuration able to connect to the source, process the Entities if needed, and reach the OpenMetadata server.

The workflow is modeled around the following JSON Schema.

This is a sample config for Datalake using AWS S3:


source:
  type: datalake
  serviceName: local_datalake
  serviceConnection:
    config:
      type: Datalake
      configSource:      
        securityConfig: 
          awsAccessKeyId: aws access key id
          awsSecretAccessKey: aws secret access key
          awsRegion: aws region
      bucketName: bucket name
      prefix: prefix
  sourceConfig:
    config:
      type: DatabaseMetadata
      tableFilterPattern:
        includes:
        - ''
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  # loggerLevel: DEBUG  # DEBUG, INFO, WARN or ERROR
  openMetadataServerConfig:
    hostPort: http://localhost:8585/api
    hostPort: "<OpenMetadata host and port>"
    authProvider: "<OpenMetadata auth provider>"

Source Configuration - Source Config using AWS S3

The sourceConfig is defined here.

  • awsAccessKeyId: Enter your secure access key ID for your DynamoDB connection. The specified key ID should be authorized to read all databases you want to include in the metadata ingestion workflow.
  • awsSecretAccessKey: Enter the Secret Access Key (the passcode key pair to the key ID from above).
  • awsRegion: Specify the region in which your DynamoDB is located. This setting is required even if you have configured a local AWS profile.
  • schemaFilterPattern and tableFilternPattern: Note that the schemaFilterPattern and tableFilterPattern both support regex as include or exclude. E.g.,

This is a sample config for Datalake using GCS:

source:
  type: datalake
  serviceName: local_datalake
  serviceConnection:
    config:
      type: Datalake
      configSource:
        securityConfig:
          gcsConfig:
            type: type of account
            projectId: project id
            privateKeyId: private key id
            privateKey: private key
            clientEmail: client email
            clientId: client id
            authUri: https://accounts.google.com/o/oauth2/auth
            tokenUri: https://oauth2.googleapis.com/token
            authProviderX509CertUrl: https://www.googleapis.com/oauth2/v1/certs
            clientX509CertUrl:  clientX509 Certificate Url
      bucketName: bucket name
      prefix: prefix
  sourceConfig:
    config:
      tableFilterPattern:
        includes:
          - ''
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  # loggerLevel: DEBUG  # DEBUG, INFO, WARN or ERROR
  openMetadataServerConfig:
    hostPort: http://localhost:8585/api
    hostPort: "<OpenMetadata host and port>"
    authProvider: "<OpenMetadata auth provider>"
  • markDeletedTables: To flag tables as soft-deleted if they are not present anymore in the source system.
  • includeTables: true or false, to ingest table data. Default is true.
  • includeViews: true or false, to ingest views definitions.
  • databaseFilterPattern, schemaFilterPattern, tableFilternPattern: Note that the they support regex as include or exclude. E.g.,

Source Configuration - Service Connection using GCS

The sourceConfig is defined here.

Source Configuration - Source Config

The sourceConfig is defined here:

  • markDeletedTables: To flag tables as soft-deleted if they are not present anymore in the source system.
  • includeTables: true or false, to ingest table data. Default is true.
  • includeViews: true or false, to ingest views definitions.
  • databaseFilterPattern, schemaFilterPattern, tableFilternPattern: Note that the they support regex as include or exclude. E.g.,

This is a sample config for Datalake using Azure:

# Datalake with Azure 

source:
  type: datalake
  serviceName: local_datalake
  serviceConnection:
    config:
      type: Datalake
      configSource:      
        securityConfig: 
          clientId: client-id
          clientSecret: client-secret
          tenantId: tenant-id
          accountName: account-name
      prefix: prefix
  sourceConfig:
    config:
      tableFilterPattern:
        includes:
        - ''
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  openMetadataServerConfig:
    hostPort: <OpenMetadata host and port>
    authProvider: <OpenMetadata auth provider>

Source Configuration - Service Connection using Azure

The sourceConfig is defined here.

  • Client ID : Client ID of the data storage account
  • Client Secret : Client Secret of the account
  • Tenant ID : Tenant ID under which the data storage account falls
  • Account Name : Account Name of the data Storage

schemaFilterPattern and tableFilternPattern: Note that the schemaFilterPattern can be used to filter container and tableFilterPattern can be used to filter files and both support regex as include or exclude. E.g.,

schemaFilterPattern:
 includes:
  - container1
  excludes:
  - container2
tableFilterPattern:
  includes:
    - *json

Sink Configuration

To send the metadata to OpenMetadata, it needs to be specified as type: metadata-rest.

Workflow Configuration

The main property here is the openMetadataServerConfig, where you can define the host and security provider of your OpenMetadata installation.

For a simple, local installation using our docker containers, this looks like:

workflowConfig:
  openMetadataServerConfig:
    hostPort: 'http://localhost:8585/api'
    authProvider: openmetadata
    securityConfig:
      jwtToken: '{bot_jwt_token}'

We support different security providers. You can find their definitions here. You can find the different implementation of the ingestion below.

chevron_rightConfigure SSO in the Ingestion Workflows

Create a Python file in your Airflow DAGs directory with the following contents:

import pathlib
import yaml
from datetime import timedelta
from airflow import DAG

try:
    from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
    from airflow.operators.python_operator import PythonOperator

from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
from airflow.utils.dates import days_ago

default_args = {
    "owner": "user_name",
    "email": ["username@org.com"],
    "email_on_failure": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(minutes=60)
}

config = """
<your YAML configuration>
"""

def metadata_ingestion_workflow():
    workflow_config = yaml.safe_load(config)
    workflow = Workflow.create(workflow_config)
    workflow.execute()
    workflow.raise_from_status()
    workflow.print_status()
    workflow.stop()

with DAG(
    "sample_data",
    default_args=default_args,
    description="An example DAG which runs a OpenMetadata ingestion workflow",
    start_date=days_ago(1),
    is_paused_upon_creation=False,
    schedule_interval='*/5 * * * *',
    catchup=False,
) as dag:
    ingest_task = PythonOperator(
        task_id="ingest_using_recipe",
        python_callable=metadata_ingestion_workflow,
    )

Note that from connector to connector, this recipe will always be the same. By updating the YAML configuration, you will be able to extract metadata from different sources.

You can learn more about how to ingest dbt models' definitions and their lineage here.

Still have questions?

You can take a look at our Q&A or reach out to us in Slack

Was this page helpful?

editSuggest edits