Extract GCS Composer Metadata
Note: This approach has been tested against Airflow 2.1.4 & 2.2.5 If you have any issues or questions, please do not hesitate to reach out!
There are 2 main approaches we can follow here to extract metadata from GCS. Both of them involve creating a DAG directly in your Composer instance, but the requirements and the steps to follow are going to be slightly different.
Feel free to choose whatever approach adapts best to your current architecture and constraints.
Using the Python Operator
The most comfortable way to extract metadata out of GCS Composer is by directly creating a DAG in there that will handle the connection to the metadata database automatically and push the contents to your OpenMetadata server.
The drawback here? You need to install
openmetadata-ingestion directly on the host. This might have some incompatibilities with your current Python environment and/or the internal (and changing) Composer requirements. In any case, once the requirements are there, preparing the DAG is super straight-forward.
Install the Requirements
In your environment you will need to install the following packages:
sqlalchemy==1.4.27: This is needed to align OpenMetadata version with the Composer internal requirements.
flask-appbuilder==3.4.5: Again, this is just an alignment of versions so that
openmetadata-ingestioncan work with GCS Composer internals.
Note: Make sure to use the
openmetadata-ingestion version that matches the server version you currently have!
Prepare the DAG!
Note that this DAG is a usual connector DAG, just using the Airflow service with the
As an example of a DAG pushing data to OpenMetadata under Google SSO, we could have:
Using the Kubernetes Pod Operator
In this second approach we won't need to install absolutely anything to the GCS Composer environment. Instead, we will rely on the
KubernetesPodOperator to use the underlying k8s cluster of Composer.
Then, the code won't directly run using the hosts' environment, but rather inside a container that we created with only the
Note: This approach only has the
openmetadata/ingestion-base ready from version 0.12.1 or higher!
The only thing we need to handle here is getting the URL of the underlying Composer's database. You can follow the official GCS docs for the steps to obtain the credentials.
In a nutshell, from the Airflow UI you can to Admin > Configurations, and search for
sql_alchemy_conn. In our case, the URL looked like this:
As GCS uses Postgres for the backend database, our Airflow connection configuration will be shaped as:
For more information on how to shape the YAML describing the Airflow metadata extraction, you can refer here.
Prepare the DAG!
Some remarks on this example code:
Kubernetes Pod Operator
You can name the task as you want (
name). The important points here are the
cmds, this should not be changed, and the
main.py script that gets shipped within the image will load the env vars as they are shown, so only modify the content of the config YAML, but not this dictionary.
Note that the example uses the image
openmetadata/ingestion-base:0.13.2. Update that accordingly for higher version once they are released. Also, the image version should be aligned with your OpenMetadata server version to avoid incompatibilities.
You can find more information about the
KubernetesPodOperator and how to tune its configurations here.
OpenMetadata Server Config
The easiest approach here is to generate a bot with a JWT token directly from the OpenMetadata UI. You can then use the following workflow config:
Against Google SSO we need to use the Cloud Storage to pass the
secretKey JSON file. Upload the file to the
gs://bucket-name/data directory, which will be mapped against
/home/airflow/gcs/data/ in Airflow.
You can see in the example above how our file is named
gcs_creds_beta.json, which gets resolved in Airflow as
The workflow config here would look like: