from datetime import timedelta
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
from airflow.utils.dates import days_ago
"email_on_failure": False,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=60)
<your JSON configuration>
def metadata_ingestion_workflow():
workflow_config = json.loads(config)
workflow = Workflow.create(workflow_config)
workflow.raise_from_status()
default_args=default_args,
description="An example DAG which runs a OpenMetadata ingestion workflow",
is_paused_upon_creation=False,
schedule_interval='*/5 * * * *',
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=metadata_ingestion_workflow,