OpenMetadata
Search…
Run Delta Lake Connector with the Airflow SDK
Use your own Airflow instance to schedule and run the Delta Lake Connector.
Configure and schedule Delta Lake metadata and profiler workflows using your own Airflow instances.

Requirements

Follow this guide to learn how to set up Airflow to run the metadata ingestions.

Metadata Ingestion

All connectors are now defined as JSON Schemas. Here you can find the structure to create a connection to MySQL.
In order to create and run a Metadata Ingestion workflow, we will follow the steps to create a JSON configuration able to connect to the source, process the Entities if needed, and reach the OpenMetadata server.
The workflow is modeled around the following JSON Schema.

1. Define the JSON Config

This is a sample config for Delta Lake:
1
{
2
"source": {
3
"type": "deltalake",
4
"serviceName": "<service name>",
5
"serviceConnection": {
6
"config": {
7
// Either of metastoreHostPort or metastoreFilePath is required
8
"metastoreHostPort": "<metastore host port>",
9
"metastoreFilePath":"<path_to_metastore>/metastore_db",
10
"appName": "MyApp"
11
}
12
},
13
"sourceConfig": {
14
"config": {
15
"enableDataProfiler": true or false,
16
"markDeletedTables": true or false,
17
"includeTables": true or false,
18
"includeViews": true or false,
19
"generateSampleData": true or false,
20
"sampleDataQuery": "<query to fetch table data>",
21
"schemaFilterPattern": "<schema name regex list>",
22
"tableFilterPattern": "<table name regex list>",
23
"dbtConfigSource": "<configs for gcs, s3, local or file server to get the DBT files"
24
}
25
}
26
},
27
"sink": {
28
"type": "metadata-rest",
29
"config": {}
30
},
31
"workflowConfig": {
32
"openMetadataServerConfig": {
33
"hostPort": "<OpenMetadata host and port>",
34
"authProvider": "<OpenMetadata auth provider>"
35
}
36
}
37
}
Copied!

Source Configuration - Service Connection

You can find all the definitions and types for the serviceConnection here.
  • metastoreHostPort (Optional): Enter the Host & Port of Hive Metastore to establish a sparks session.
  • metastoreFilePath (Optional): Enter the file path to local Metastore incase sparks cluster is running locally.
  • appName (Optional): Enter the app name of spark session.
  • connectionOptions (Optional): Enter the details for any additional connection options that can be sent to MySQL during the connection. These details must be added as Key-Value pairs.
  • connectionArguments (Optional): Enter the details for any additional connection arguments such as security or protocol configs that can be sent to MySQL during the connection. These details must be added as Key-Value pairs.

Source Configuration - Source Config

The sourceConfig is defined here.
  • enableDataProfiler: **** true or false, to run the profiler (not the tests) during the metadata ingestion.
  • markDeletedTables: To flag tables as soft-deleted if they are not present anymore in the source system.
  • includeTables: true or false, to ingest table data. Default is true.
  • includeViews: true or false, to ingest views definitions.
  • generateSampleData: To ingest sample data based on sampleDataQuery.
  • sampleDataQuery: Defaults to select * from {}.{} limit 50.
  • schemaFilterPattern and tableFilterPattern: Note that the schemaFilterPattern and tableFilterPattern both support regex as include or exclude. E.g.,
1
"tableFilterPattern": {
2
"includes": ["users", "type_test"]
3
}
Copied!

Sink Configuration

To send the metadata to OpenMetadata, it needs to be specified as "type": "metadata-rest".

Workflow Configuration

The main property here is the openMetadataServerConfig, where you can define the host and security provider of your OpenMetadata installation.
For a simple, local installation using our docker containers, this looks like:
1
"workflowConfig": {
2
"openMetadataServerConfig": {
3
"hostPort": "http://localhost:8585/api",
4
"authProvider": "no-auth"
5
}
6
}
Copied!

OpenMetadata Security Providers

We support different security providers. You can find their definitions here. An example of an Auth0 configuration would be the following:
1
"workflowConfig": {
2
"openMetadataServerConfig": {
3
"hostPort": "http://localhost:8585/api",
4
"authProvider": "auth0",
5
"securityConfig": {
6
"clientId": "<client ID>",
7
"secretKey": "<secret key>",
8
"domain": "<domain>"
9
}
10
}
11
}
Copied!

2. Prepare the Ingestion DAG

Create a Python file in your Airflow DAGs directory with the following contents:
1
import pathlib
2
import json
3
from datetime import timedelta
4
from airflow import DAG
5
​
6
try:
7
from airflow.operators.python import PythonOperator
8
except ModuleNotFoundError:
9
from airflow.operators.python_operator import PythonOperator
10
​
11
from metadata.config.common import load_config_file
12
from metadata.ingestion.api.workflow import Workflow
13
from airflow.utils.dates import days_ago
14
​
15
default_args = {
16
"owner": "user_name",
17
"email": ["[email protected]"],
18
"email_on_failure": False,
19
"retries": 3,
20
"retry_delay": timedelta(minutes=5),
21
"execution_timeout": timedelta(minutes=60)
22
}
23
​
24
config = """
25
<your JSON configuration>
26
"""
27
​
28
def metadata_ingestion_workflow():
29
workflow_config = json.loads(config)
30
workflow = Workflow.create(workflow_config)
31
workflow.execute()
32
workflow.raise_from_status()
33
workflow.print_status()
34
workflow.stop()
35
​
36
​
37
with DAG(
38
"sample_data",
39
default_args=default_args,
40
description="An example DAG which runs a OpenMetadata ingestion workflow",
41
start_date=days_ago(1),
42
is_paused_upon_creation=False,
43
schedule_interval='*/5 * * * *',
44
catchup=False,
45
) as dag:
46
ingest_task = PythonOperator(
47
task_id="ingest_using_recipe",
48
python_callable=metadata_ingestion_workflow,
49
)
Copied!
Note that from connector to connector, this recipe will always be the same. By updating the JSON configuration, you will be able to extract metadata from different sources.

Data Profiler and Quality Tests

The Data Profiler workflow will be using the orm-profiler processor. While the serviceConnection will still be the same to reach the source system, the sourceConfig will be updated from previous configurations.

1. Define the JSON configuration

This is a sample config for a MySQL profiler:
1
{
2
"source": {
3
"type": "mysql",
4
"serviceName": "<service name>",
5
"serviceConnection": {
6
"config": {
7
// Either of metastoreHostPort or metastoreFilePath is required
8
"metastoreHostPort": "<metastore host port>",
9
"metastoreFilePath":"<path_to_metastore>/metastore_db",
10
"appName": "MyApp"
11
}
12
},
13
"sourceConfig": {
14
"config": {
15
"type": "Profiler",
16
"fqnFilterPattern": "<table FQN filtering regex>"
17
}
18
}
19
},
20
"processor": {
21
"type": "orm-profiler",
22
"config": {}
23
},
24
"sink": {
25
"type": "metadata-rest",
26
"config": {}
27
},
28
"workflowConfig": {
29
"openMetadataServerConfig": {
30
"hostPort": "<OpenMetadata host and port>",
31
"authProvider": "<OpenMetadata auth provider>"
32
}
33
}
34
}
Copied!

Source Configuration

  • You can find all the definitions and types for the serviceConnection here.
  • The sourceConfig is defined here. If you don't need to add any fqnFilterPattern, the "type": "Profiler" is still required to be present.
Note that the fqnFilterPattern supports regex as include or exclude. E.g.,
1
"fqnFilterPattern": {
2
"includes": ["service.database.schema.*"]
3
}
Copied!

Processor

To choose the orm-profiler. It can also be updated to define tests from the JSON itself instead of the UI:
1
"processor": {
2
"type": "orm-profiler",
3
"config": {
4
"test_suite": {
5
"name": "<Test Suite name>",
6
"tests": [
7
{
8
"table": "<Table FQN>",
9
"table_tests": [
10
{
11
"testCase": {
12
"config": {
13
"value": 100
14
},
15
"tableTestType": "tableRowCountToEqual"
16
}
17
}
18
],
19
"column_tests": [
20
{
21
"columnName": "<Column Name>",
22
"testCase": {
23
"config": {
24
"minValue": 0,
25
"maxValue": 99
26
},
27
"columnTestType": "columnValuesToBeBetween"
28
}
29
}
30
]
31
}
32
]
33
}
34
}
35
},
Copied!
tests is a list of test definitions that will be applied to table, informed by its FQN. For each table, one can then define a list of table_tests and column_tests. Review the supported tests and their definitions to learn how to configure the different cases here.

Workflow Configuration

The same as the metadata ingestion.

2. Prepare the Ingestion DAG

Here, we follow a similar approach as with the metadata and usage pipelines, although we will use a different Workflow class:
1
import json
2
from datetime import timedelta
3
​
4
from airflow import DAG
5
​
6
try:
7
from airflow.operators.python import PythonOperator
8
except ModuleNotFoundError:
9
from airflow.operators.python_operator import PythonOperator
10
​
11
from airflow.utils.dates import days_ago
12
​
13
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
14
​
15
​
16
default_args = {
17
"owner": "user_name",
18
"email_on_failure": False,
19
"retries": 3,
20
"retry_delay": timedelta(seconds=10),
21
"execution_timeout": timedelta(minutes=60),
22
}
23
​
24
config = """
25
<your JSON configuration>
26
"""
27
​
28
def metadata_ingestion_workflow():
29
workflow_config = json.loads(config)
30
workflow = ProfilerWorkflow.create(workflow_config)
31
workflow.execute()
32
workflow.raise_from_status()
33
workflow.print_status()
34
workflow.stop()
35
​
36
with DAG(
37
"profiler_example",
38
default_args=default_args,
39
description="An example DAG which runs a OpenMetadata ingestion workflow",
40
start_date=days_ago(1),
41
is_paused_upon_creation=False,
42
catchup=False,
43
) as dag:
44
ingest_task = PythonOperator(
45
task_id="profile_and_test_using_recipe",
46
python_callable=metadata_ingestion_workflow,
47
)pytho
Copied!

DBT Integration

You can learn more about how to ingest DBT models' definitions and their lineage here.