sdk

No menu items for this category

Python SDK for Lineage

In this guide, we will use the Python SDK to create and fetch Lineage information.

For simplicity, we are going to create lineage between Tables. However, this would work with ANY entity.

You can find the Lineage Entity defined here, as well as the Entity defining the payload to add a new lineage: AddLineage.

In the following sections we will:

  • Create a Database Service, a Database, a Schema and two Tables,
  • Add Lineage between both Tables,
  • Get the Lineage information back.

A prerequisite for this section is to have previously gone through the following docs.

To prepare the necessary ingredients, execute the following steps.

All functions that we are going to use related to Lineage can be found in here

We are mocking a MySQL instance. Note how we need to pass the right configuration class MysqlConnection, as a parameter for the generic DatabaseConnection type.

Any Entity that is created and linked to another Entity, has to hold the fullyQualifiedName to the Entity it relates to. In this case, a Database is bound to a specific service.

The same happens with the Schemas. They are related to a Database.

And finally, Tables are contained in a specific Schema, so we use the fullyQualifiedName here as well.

We are doing a simple example with a single column.

With everything prepared, we can now create the Lineage between both Entities. An AddLineageRequest type represents the edge between two Entities, typed under EntitiesEdge.

The Python client will already return us a JSON object with the Lineage information about the fromEntity node we added:

If the node were to have other edges already, they would be showing up here.

If we validate the Lineage from the UI, we will see:

simple-lineage

Finally, let's fetch the lineage from the other node involved:

Which will give us the symmetric results from above

Lineage Details

Note how when adding lineage information we give to the API an AddLineage Request. This is composed of an Entity Edge, whose definition you can find here.

In a nutshell, an Entity Edge has:

  1. The Entity Reference as the lineage origin,
  2. The Entity Reference as the lineage destination,
  3. Optionally, Lineage Details.

In the Lineage Details property we can pass further information specific about Table to Table lineage:

  • sqlQuery specifying the transformation,
  • An array of columnsLineage as an object with an array of source and destination columns, as well as their own specific transformation function,
  • Optionally, the Entity Reference of a Pipeline powering the transformation from Table A to Table B.

The API call will be exactly the same as before, but now we will add more ingredients when defining our objects. Let's see how to do that and play with the possible combinations:

First, import the required classes and create a new table:

We can start by linking our columns together. For that we are going to create:

  1. A ColumnLineage object, linking our Table A column ID -> Table C column ID. Note that this can be a list!
  2. A LineageDetails object, passing the column lineage and the SQL query that powers the transformation.

This information will now be reflected in the UI as well:

lineage-col

We can as well pass the reference to the pipeline used to create the lineage (e.g., the ETL feeding the tables).

To prepare this example, we need to start by creating the Pipeline Entity. Again, we'll need first to prepare the Pipeline Service:

With these ingredients ready, we can then follow the code above and add there a pipeline argument as an Entity Reference:

Automated SQL lineage

In case you want OpenMetadata to identify the lineage based on the sql query, then you can make use of the method add_lineage_by_query of the python SDK to parser the sql and generate the lineage in OpenMetadata.

follow the below code snippet for the example:

Above example would create a lineage between target_table and source_table within my_service database service.

To create the automated sql lineage via CLI, you need to make sure that you have installed the openmetadata-ingestion package in your local environment using command pip install openmetadata-ingestion.

Once that is done you will have to prepare a yaml file as follows.

  • serviceName: Name of the database service which contains the table involved in query.
  • query: You can specify the raw sql query within the yaml file itself.
  • filePath: In case the query is too big then you can also save query in a file and pass the path to the file in this field.
  • parseTimeout: Timeout for the lineage parsing process.
  • workflowConfig: The main property here is the openMetadataServerConfig, where you can define the host and security provider of your OpenMetadata installation.

Once the yaml file is prepare you can run the command