deployment

No menu items for this category

Run the ingestion from your Airflow

OpenMetadata integrates with Airflow to orchestrate ingestion workflows. You can use Airflow to extract metadata and [deploy workflows] (/deployment/ingestion/openmetadata) directly. This guide explains how to run ingestion workflows in Airflow using three different operators:

  1. Python Operator
  2. Docker Operator
  3. Python Virtualenv Operator

Install the openmetadata-ingestion package in your Airflow environment. This approach works best if you have access to the Airflow host and can manage dependencies.

-Replace <plugin> with the sources to ingest, such as mysql, snowflake, or s3. -Replace x.y.z with the OpenMetadata version matching your server (e.g., 1.6.1).

  • Function Setup: The python_callable argument in the PythonOperator executes the metadata_ingestion_workflow function, which instantiates the workflow and runs the ingestion process.
  • Drawback: This method requires pre-installed dependencies, which may not always be feasible. Consider using the DockerOperator or PythonVirtualenvOperator as alternatives.

For this operator, we can use the openmetadata/ingestion-base image. This is useful to prepare DAGs without any installation required on the environment, although it needs for the host to have access to the Docker commands.

Ensure the Airflow host can run Docker commands. For Docker Compose setups, map the Docker socket as follows:

  • Image Version: Ensure the Docker image version matches your OpenMetadata server version (e.g., openmetadata/ingestion-base:0.13.2).
  • Pipeline Types: Set the pipelineType to metadata, usage, lineage, profiler, or other supported values.
  • No Installation Required: The DockerOperator eliminates the need to install dependencies directly on the Airflow host.

Another important point here is making sure that the Airflow will be able to run Docker commands to create the task. As our example was done with Airflow in Docker Compose, that meant setting docker_url="unix://var/run/docker.sock".

The final important elements here are:

  • command="python main.py": This does not need to be modified, as we are shipping the main.py script in the image, used to trigger the workflow.
  • environment={"config": config, "pipelineType": "metadata"}: Again, in most cases you will just need to update the config string to point to the right connector.

Other supported values of pipelineType are usage, lineage, profiler, dataInsight, elasticSearchReindex, dbt, application or TestSuite. Pass the required flag depending on the type of workflow you want to execute. Make sure that the YAML config reflects what ingredients are required for your Workflow.

As stated in Airflow's docs, install the virtualenv package on the Airflow host.If using a different Python version in the virtual environment (e.g., Python 3.9 while Airflow uses 3.7), install additional packages such as:

Function Rules:

  • Use a def function (not part of a class).
  • All imports must occur inside the function.
  • Avoid referencing variables outside the function's scope.

We have different classes for different types of workflows. The logic is always the same, but you will need to change your import path. The rest of the method calls will remain the same.

For example, for the Metadata workflow we'll use:

The classes for each workflow type are:

  • Metadata: from metadata.workflow.metadata import MetadataWorkflow
  • Lineage: from metadata.workflow.metadata import MetadataWorkflow (same as metadata)
  • Usage: from metadata.workflow.usage import UsageWorkflow
  • dbt: from metadata.workflow.metadata import MetadataWorkflow
  • Profiler: from metadata.workflow.profiler import ProfilerWorkflow
  • Data Quality: from metadata.workflow.data_quality import TestSuiteWorkflow
  • Data Insights: from metadata.workflow.data_insight import DataInsightWorkflow
  • Elasticsearch Reindex: from metadata.workflow.metadata import MetadataWorkflow (same as metadata)