OpenMetadata
Search…
Run Tableau Connector with the Airflow SDK
Use your own Airflow instance to schedule and run the Tableau Connector.
Configure and schedule Redash metadata workflows using your own Airflow instances.

Requirements

Follow this guide to learn how to set up Airflow to run the metadata ingestions.

Metadata Ingestion

All connectors are now defined as JSON Schemas. Here you can find the structure to create a connection to Redash.
In order to create and run a Metadata Ingestion workflow, we will follow the steps to create a YAML configuration able to connect to the source, process the Entities if needed, and reach the OpenMetadata server.
The workflow is modeled around the following JSON Schema.

1. Define the YAML Config

This is a sample config for Redash:
1
source:
2
type: redash
3
serviceName: local_redash
4
serviceConnection:
5
config:
6
type: Redash
7
hostPort: http://localhost:5000
8
apiKey: api_key
9
username: random
10
sourceConfig:
11
config:
12
dashboardFilterPattern: {}
13
chartFilterPattern: {}
14
sink:
15
type: metadata-rest
16
config: {}
17
workflowConfig:
18
openMetadataServerConfig:
19
hostPort: http://localhost:8585/api
20
authProvider: no-auth
Copied!

Source Configuration - Service Connection

You can find all the definitions and types for the serviceConnection here.
  • username: Enter the username of your Redash user in the Username field. The specified user should be authorized to read all databases you want to include in the metadata ingestion workflow.
  • password: Enter the password for your Redash user in the Password field.
  • hostPort: Enter the fully qualified hostname and port number for your Redash deployment in the Host and Port field.
  • dbServiceName: If you want create Lineage enter the database Service name.

Source Configuration - Source Config

The sourceConfig is defined here.
  • dashboardFilterPattern and chartFilterPattern: Note that the dashboardFilterPattern and chartFilterPattern both support regex as include or exclude. E.g.,
1
dashboardFilterPattern:
2
includes:
3
- users
4
- type_test
Copied!

Sink Configuration

To send the metadata to OpenMetadata, it needs to be specified as "type": "metadata-rest".

Workflow Configuration

The main property here is the openMetadataServerConfig, where you can define the host and security provider of your OpenMetadata installation.
For a simple, local installation using our docker containers, this looks like:
1
workflowConfig:
2
openMetadataServerConfig:
3
hostPort: http://localhost:8585/api
4
authProvider: no-auth
Copied!

OpenMetadata Security Providers

We support different security providers. You can find their definitions here. An example of an Auth0 configuration would be the following:
1
workflowConfig:
2
openMetadataServerConfig:
3
hostPort: http://localhost:8585/api
4
authProvider: auth0
5
securityConfig:
6
clientId: <client ID>
7
secretKey: <secret key>
8
domain: <domain>
Copied!

2. Prepare the Ingestion DAG

Create a Python file in your Airflow DAGs directory with the following contents:
1
import pathlib
2
import json
3
from datetime import timedelta
4
from airflow import DAG
5
​
6
try:
7
from airflow.operators.python import PythonOperator
8
except ModuleNotFoundError:
9
from airflow.operators.python_operator import PythonOperator
10
​
11
from metadata.config.common import load_config_file
12
from metadata.ingestion.api.workflow import Workflow
13
from airflow.utils.dates import days_ago
14
​
15
default_args = {
16
"owner": "user_name",
17
"email": ["[email protected]"],
18
"email_on_failure": False,
19
"retries": 3,
20
"retry_delay": timedelta(minutes=5),
21
"execution_timeout": timedelta(minutes=60)
22
}
23
​
24
config = """
25
<your JSON configuration>
26
"""
27
​
28
def metadata_ingestion_workflow():
29
workflow_config = json.loads(config)
30
workflow = Workflow.create(workflow_config)
31
workflow.execute()
32
workflow.raise_from_status()
33
workflow.print_status()
34
workflow.stop()
35
​
36
​
37
with DAG(
38
"sample_data",
39
default_args=default_args,
40
description="An example DAG which runs a OpenMetadata ingestion workflow",
41
start_date=days_ago(1),
42
is_paused_upon_creation=False,
43
schedule_interval='*/5 * * * *',
44
catchup=False,
45
) as dag:
46
ingest_task = PythonOperator(
47
task_id="ingest_using_recipe",
48
python_callable=metadata_ingestion_workflow,
49
)
Copied!
Note that from connector to connector, this recipe will always be the same. By updating the JSON configuration, you will be able to extract metadata from different sources.