Manging Credentials

On the release 0.12 we updated how services credentials are handled from an Ingestion Workflow. We are covering now two scenarios:

  1. If we are running a metadata workflow for the first time, pointing to a service that does not yet exist, then the service will be created from the Metadata Ingestion pipeline. It does not matter if the workflow is run from the CLI or any other scheduler.
  2. If instead, there is an already existing service to which we are pointing with a Metadata Ingestion pipeline, then we will be using the stored credentials, not the ones incoming from the YAML config.

What this means is that once a service is created, the only way to update its connection credentials is via the UI or directly running an API call. This prevents the scenario where a new YAML config is created, using a name of a service that already exists, but pointing to a completely different source system.

One of the main benefits of this approach is that if an admin in our organisation creates the service from the UI, then we can prepare any Ingestion Workflow without having to pass the connection details.

For example, for an Athena YAML, instead of requiring the full set of credentials as below:

source:
  type: athena
  serviceName: my_athena_service
  serviceConnection:
    config:
      type: Athena
      awsConfig:
        awsAccessKeyId: KEY
        awsSecretAccessKey: SECRET
        awsRegion: us-east-2
      s3StagingDir: s3 directory for datasource
      workgroup: workgroup name
  sourceConfig:
    config:
      markDeletedTables: true
      includeTables: true
      includeViews: true
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  openMetadataServerConfig:
    hostPort: <OpenMetadata host and port>
    authProvider: <OpenMetadata auth provider>

We can use a simplified version:

source:
  type: athena
  serviceName: my_athena_service
  sourceConfig:
    config:
      markDeletedTables: true
      includeTables: true
      includeViews: true
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  openMetadataServerConfig:
    hostPort: <OpenMetadata host and port>
    authProvider: <OpenMetadata auth provider>

The workflow will then dynamically pick up the service connection details for my_athena_service and ingest the metadata accordingly.

If instead, you want to have the full source of truth in your DAGs or processes, you can keep reading on different ways to secure the credentials in your environment and not have them at plain sight.

Note

Note that these are just a few examples. Any secure and automated approach to retrieve a string would work here, as our only requirement is to pass the string inside the YAML configuration.

When running Workflow with the CLI or your favourite scheduler, it's safer to not have the services' credentials visible. For the CLI, the ingestion package can load sensitive information from environment variables.

For example, if you are using the Glue connector you could specify the AWS configurations as follows in the case of a JSON config file

[...]
"awsConfig": {
    "awsAccessKeyId": "${AWS_ACCESS_KEY_ID}",
    "awsSecretAccessKey": "${AWS_SECRET_ACCESS_KEY}",
    "awsRegion": "${AWS_REGION}",
    "awsSessionToken": "${AWS_SESSION_TOKEN}"
},
[...]

Or

[...]
awsConfig:
  awsAccessKeyId: '${AWS_ACCESS_KEY_ID}'
  awsSecretAccessKey: '${AWS_SECRET_ACCESS_KEY}'
  awsRegion: '${AWS_REGION}'
  awsSessionToken: '${AWS_SESSION_TOKEN}'
[...]

for a YAML configuration.

The AWS Credentials are based on the following JSON Schema. Note that the only required field is the awsRegion. This configuration is rather flexible to allow installations under AWS that directly use instance roles for permissions to authenticate to whatever service we are pointing to without having to write the credentials down.

AWS Vault

If using aws-vault, it gets a bit more involved to run the CLI ingestion as the credentials are not globally available in the terminal. In that case, you could use the following command after setting up the ingestion configuration file:

aws-vault exec <role> -- $SHELL -c 'metadata ingest -c <path to connector>'

The GCS Credentials are based on the following JSON Schema. These are the fields that you can export when preparing a Service Account.

Once the account is created, you can see the fields in the exported JSON file from:

IAM & Admin > Service Accounts > Keys

You can validate the whole Google service account setup here.

In any connector page, you might have seen an example on how to build a DAG to run the ingestion with Airflow (e.g., Athena).

A possible approach to retrieving sensitive information from Airflow would be using Airflow's Connections. Note that these connections can be stored as environment variables, to Airflow's underlying DB or to multiple external services such as Hashicorp Vault. Note that for external systems, you'll need to provide the necessary package and configure the Secrets Backend. The best way to choose how to store these credentials is to go through Airflow's docs.

Example

Let's go over an example on how to create a connection to extract data from MySQL and how a DAG would look like afterwards.

Step 1 - Create the Connection

From our Airflow host, (e.g., docker exec -it openmetadata_ingestion bash if testing in Docker), you can run:

airflow connections add 'my_mysql_db' \
    --conn-uri 'mysql+pymysql://openmetadata_user:openmetadata_password@mysql:3306/openmetadata_db'

You will see an output like

Successfully added `conn_id`=my_mysql_db : mysql+pymysql://openmetadata_user:openmetadata_password@mysql:3306/openmetadata_db

Checking the credentials from the Airflow UI, we will see:

Airflow Connection

Step 2 - Understanding the shape of a Connection

In the same host, we can open a Python shell to explore the Connection object with some more details. To do so, we first need to pick up the connection from Airflow. We will use the BaseHook for that as the connection is not stored in any external system.

from airflow.hooks.base import BaseHook

# Retrieve the connection
connection = BaseHook.get_connection("my_mysql_db")

# Access the connection details
connection.host  # 'mysql'
connection.port  # 3306
connection.login  # 'openmetadata_user'
connection.password  # 'openmetadata_password'

Based on this information, we now know how to prepare the DAG!

Step 3 - Write the DAG

A full example on how to write a DAG to ingest data from our Connection can look like this:

import pathlib
import yaml
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago

try:
    from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
    from airflow.operators.python_operator import PythonOperator

from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow

# Import the hook
from airflow.hooks.base import BaseHook

# Retrieve the connection
connection = BaseHook.get_connection("my_mysql_db")

# Use the connection details when setting the YAML
# Note how we escaped the braces as {{}} to not be parsed by the f-string
config = f"""
source:
  type: mysql
  serviceName: mysql_from_connection
  serviceConnection:
    config:
      type: Mysql
      username: {connection.login}
      password: {connection.password}
      hostPort: {connection.host}:{connection.port}
      # databaseSchema: schema
  sourceConfig:
    config:
      markDeletedTables: true
      includeTables: true
      includeViews: true
sink:
  type: metadata-rest
  config: {{}}
workflowConfig:
  openMetadataServerConfig:
    hostPort: "<OpenMetadata host and port>"
    authProvider: "<OpenMetadata auth provider>"
"""

def metadata_ingestion_workflow():
    workflow_config = yaml.safe_load(config)
    workflow = Workflow.create(workflow_config)
    workflow.execute()
    workflow.raise_from_status()
    workflow.print_status()
    workflow.stop()

with DAG(
    "mysql_connection_ingestion",
    description="An example DAG which runs a OpenMetadata ingestion workflow",
    start_date=days_ago(1),
    is_paused_upon_creation=False,
    schedule_interval='*/5 * * * *',
    catchup=False,
) as dag:
    ingest_task = PythonOperator(
        task_id="ingest_using_recipe",
        python_callable=metadata_ingestion_workflow,
    )

Option B - Reuse an existing Service

Following the explanation at the beginning of this doc, we can reuse the credentials from an existing service in a DAG as well, and just omit the serviceConnection YAML entries:

import pathlib
import yaml
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago

try:
    from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
    from airflow.operators.python_operator import PythonOperator

from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow

config = """
source:
  type: mysql
  serviceName: existing_mysql_service
  sourceConfig:
    config:
      markDeletedTables: true
      includeTables: true
      includeViews: true
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  openMetadataServerConfig:
    hostPort: "<OpenMetadata host and port>"
    authProvider: "<OpenMetadata auth provider>"
"""

def metadata_ingestion_workflow():
    workflow_config = yaml.safe_load(config)
    workflow = Workflow.create(workflow_config)
    workflow.execute()
    workflow.raise_from_status()
    workflow.print_status()
    workflow.stop()

with DAG(
    "mysql_connection_ingestion",
    description="An example DAG which runs a OpenMetadata ingestion workflow",
    start_date=days_ago(1),
    is_paused_upon_creation=False,
    schedule_interval='*/5 * * * *',
    catchup=False,
) as dag:
    ingest_task = PythonOperator(
        task_id="ingest_using_recipe",
        python_callable=metadata_ingestion_workflow,
    )

Still have questions?

You can take a look at our Q&A or reach out to us in Slack

Was this page helpful?

editSuggest edits