OpenMetadata
Search…
Run Metadata Ingestion
Below is the sample code example you can refer to integrate with Airflow

Airflow Example for Sample Data

1
import pathlib
2
import json
3
from datetime import timedelta
4
from airflow import DAG
5
6
try:
7
from airflow.operators.python import PythonOperator
8
except ModuleNotFoundError:
9
from airflow.operators.python_operator import PythonOperator
10
11
from metadata.config.common import load_config_file
12
from metadata.ingestion.api.workflow import Workflow
13
from airflow.utils.dates import days_ago
14
15
default_args = {
16
"owner": "user_name",
17
"email": ["[email protected]"],
18
"email_on_failure": False,
19
"retries": 3,
20
"retry_delay": timedelta(minutes=5),
21
"execution_timeout": timedelta(minutes=60)
22
}
23
24
config = """
25
{
26
"source": {
27
"type": "sample-data",
28
"config": {
29
"sample_data_folder": "./examples/sample_data"
30
}
31
},
32
"sink": {
33
"type": "metadata-rest",
34
"config": {}
35
},
36
"metadata_server": {
37
"type": "metadata-server",
38
"config": {
39
"api_endpoint": "http://localhost:8585/api",
40
"auth_provider_type": "no-auth"
41
}
42
}
43
}
44
"""
45
46
def metadata_ingestion_workflow():
47
workflow_config = json.loads(config)
48
workflow = Workflow.create(workflow_config)
49
workflow.execute()
50
workflow.raise_from_status()
51
workflow.print_status()
52
workflow.stop()
53
54
55
with DAG(
56
"sample_data",
57
default_args=default_args,
58
description="An example DAG which runs a OpenMetadata ingestion workflow",
59
start_date=days_ago(1),
60
is_paused_upon_creation=False,
61
schedule_interval='*/5 * * * *',
62
catchup=False,
63
) as dag:
64
ingest_task = PythonOperator(
65
task_id="ingest_using_recipe",
66
python_callable=metadata_ingestion_workflow,
67
)
Copied!
we are using a python method like below
1
def metadata_ingestion_workflow():
2
workflow_config = json.loads(config)
3
workflow = Workflow.create(workflow_config)
4
workflow.execute()
5
workflow.raise_from_status()
6
workflow.print_status()
7
workflow.stop
Copied!
Create a Workflow instance and pass a sample-data configuration which will read metadata from Json files and ingest it into the OpenMetadata Server. You can customize this configuration or add different connectors please refer to our examples and refer to Connectors.
Last modified 10d ago
Copy link