Manging Credentials
On the release 0.12 we updated how services credentials are handled from an Ingestion Workflow. We are covering now two scenarios:
- If we are running a metadata workflow for the first time, pointing to a service that does not yet exist, then the service will be created from the Metadata Ingestion pipeline. It does not matter if the workflow is run from the CLI or any other scheduler.
- If instead, there is an already existing service to which we are pointing with a Metadata Ingestion pipeline, then we will be using the stored credentials, not the ones incoming from the YAML config.
Existing Services
What this means is that once a service is created, the only way to update its connection credentials is via the UI or directly running an API call. This prevents the scenario where a new YAML config is created, using a name of a service that already exists, but pointing to a completely different source system.
One of the main benefits of this approach is that if an admin in our organisation creates the service from the UI, then we can prepare any Ingestion Workflow without having to pass the connection details.
For example, for an Athena YAML, instead of requiring the full set of credentials as below:
source:
type: athena
serviceName: my_athena_service
serviceConnection:
config:
type: Athena
awsConfig:
awsAccessKeyId: KEY
awsSecretAccessKey: SECRET
awsRegion: us-east-2
s3StagingDir: s3 directory for datasource
workgroup: workgroup name
sourceConfig:
type: DatabaseMetadata
config:
markDeletedTables: true
includeTables: true
includeViews: true
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>
We can use a simplified version:
source:
type: athena
serviceName: my_athena_service
sourceConfig:
config:
type: DatabaseMetadata
markDeletedTables: true
includeTables: true
includeViews: true
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>
The workflow will then dynamically pick up the service connection details for my_athena_service
and ingest
the metadata accordingly.
If instead, you want to have the full source of truth in your DAGs or processes, you can keep reading on different ways to secure the credentials in your environment and not have them at plain sight.
Securing Credentials
Note
Note that these are just a few examples. Any secure and automated approach to retrieve a string would work here, as our only requirement is to pass the string inside the YAML configuration.
When running Workflow with the CLI or your favourite scheduler, it's safer to not have the services' credentials visible. For the CLI, the ingestion package can load sensitive information from environment variables.
For example, if you are using the Glue connector you could specify the AWS configurations as follows in the case of a JSON config file
[...]
"awsConfig": {
"awsAccessKeyId": "${AWS_ACCESS_KEY_ID}",
"awsSecretAccessKey": "${AWS_SECRET_ACCESS_KEY}",
"awsRegion": "${AWS_REGION}",
"awsSessionToken": "${AWS_SESSION_TOKEN}"
},
[...]
Or
[...]
awsConfig:
awsAccessKeyId: '${AWS_ACCESS_KEY_ID}'
awsSecretAccessKey: '${AWS_SECRET_ACCESS_KEY}'
awsRegion: '${AWS_REGION}'
awsSessionToken: '${AWS_SESSION_TOKEN}'
[...]
for a YAML configuration.
AWS Credentials
The AWS Credentials are based on the following JSON Schema.
Note that the only required field is the awsRegion
. This configuration is rather flexible to allow installations under AWS
that directly use instance roles for permissions to authenticate to whatever service we are pointing to without having to
write the credentials down.
AWS Vault
If using aws-vault, it gets a bit more involved to run the CLI ingestion as the credentials are not globally available in the terminal. In that case, you could use the following command after setting up the ingestion configuration file:
aws-vault exec <role> -- $SHELL -c 'metadata ingest -c <path to connector>'
GCS Credentials
The GCS Credentials are based on the following JSON Schema. These are the fields that you can export when preparing a Service Account.
Once the account is created, you can see the fields in the exported JSON file from:
IAM & Admin > Service Accounts > Keys
You can validate the whole Google service account setup here.
Using Airflow Connections
In any connector page, you might have seen an example on how to build a DAG to run the ingestion with Airflow (e.g., Athena).
A possible approach to retrieving sensitive information from Airflow would be using Airflow's Connections. Note that these connections can be stored as environment variables, to Airflow's underlying DB or to multiple external services such as Hashicorp Vault. Note that for external systems, you'll need to provide the necessary package and configure the Secrets Backend. The best way to choose how to store these credentials is to go through Airflow's docs.
Example
Let's go over an example on how to create a connection to extract data from MySQL and how a DAG would look like afterwards.
Step 1 - Create the Connection
From our Airflow host, (e.g., docker exec -it openmetadata_ingestion bash
if testing in Docker), you can run:
airflow connections add 'my_mysql_db' \
--conn-uri 'mysql+pymysql://openmetadata_user:openmetadata_password@mysql:3306/openmetadata_db'
You will see an output like
Successfully added `conn_id`=my_mysql_db : mysql+pymysql://openmetadata_user:openmetadata_password@mysql:3306/openmetadata_db
Checking the credentials from the Airflow UI, we will see:

Step 2 - Understanding the shape of a Connection
In the same host, we can open a Python shell to explore the Connection object with some more details. To do so, we first
need to pick up the connection from Airflow. We will use the BaseHook
for that as the connection is not stored
in any external system.
from airflow.hooks.base import BaseHook
# Retrieve the connection
connection = BaseHook.get_connection("my_mysql_db")
# Access the connection details
connection.host # 'mysql'
connection.port # 3306
connection.login # 'openmetadata_user'
connection.password # 'openmetadata_password'
Based on this information, we now know how to prepare the DAG!
Step 3 - Write the DAG
A full example on how to write a DAG to ingest data from our Connection can look like this:
import pathlib
import yaml
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
# Import the hook
from airflow.hooks.base import BaseHook
# Retrieve the connection
connection = BaseHook.get_connection("my_mysql_db")
# Use the connection details when setting the YAML
# Note how we escaped the braces as {{}} to not be parsed by the f-string
config = f"""
source:
type: mysql
serviceName: mysql_from_connection
serviceConnection:
config:
type: Mysql
username: {connection.login}
password: {connection.password}
hostPort: {connection.host}:{connection.port}
# databaseSchema: schema
sourceConfig:
config:
markDeletedTables: true
includeTables: true
includeViews: true
sink:
type: metadata-rest
config: {{}}
workflowConfig:
openMetadataServerConfig:
hostPort: "<OpenMetadata host and port>"
authProvider: "<OpenMetadata auth provider>"
"""
def metadata_ingestion_workflow():
workflow_config = yaml.safe_load(config)
workflow = Workflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"mysql_connection_ingestion",
description="An example DAG which runs a OpenMetadata ingestion workflow",
start_date=days_ago(1),
is_paused_upon_creation=False,
schedule_interval='*/5 * * * *',
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=metadata_ingestion_workflow,
)
Option B - Reuse an existing Service
Following the explanation at the beginning of this doc, we can reuse the credentials from an existing service
in a DAG as well, and just omit the serviceConnection
YAML entries:
import pathlib
import yaml
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
config = """
source:
type: mysql
serviceName: existing_mysql_service
sourceConfig:
config:
markDeletedTables: true
includeTables: true
includeViews: true
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: "<OpenMetadata host and port>"
authProvider: "<OpenMetadata auth provider>"
"""
def metadata_ingestion_workflow():
workflow_config = yaml.safe_load(config)
workflow = Workflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"mysql_connection_ingestion",
description="An example DAG which runs a OpenMetadata ingestion workflow",
start_date=days_ago(1),
is_paused_upon_creation=False,
schedule_interval='*/5 * * * *',
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=metadata_ingestion_workflow,
)