Python SDK

We are now going to present a high-level Python API as a type-safe and gentle wrapper for the OpenMetadata backend.

Note

The Python SDK is part of the openmetadata-ingestion base package. You can install it from PyPI.

Make sure to use the same openmetadata-ingestion version as your server version. For example, if you have the OpenMetadata server at version 0.13.0, you will need to install:

pip install "openmetadata-ingestion~=0.13.0"

In the OpenMetadata Design, we have been dissecting the internals of OpenMetadata. The main conclusion here is twofold:

  • Everything is handled via the API, and
  • Data structures (Entity definitions) are at the heart of the solution.

This means that whenever we need to interact with the metadata system or develop a new connector or logic, we have to make sure that we pass the proper inputs and handle the types of outputs.

Let's suppose that we have our local OpenMetadata server running at http:localhost:8585. We can play with it with simple cURL or httpie commands, and if we just want to take a look at the Entity instances we have lying around, that might probably be enough.

However, let's imagine that we want to create or update an ML Model Entity with a PUT. To do so, we need to make sure that we are providing a proper JSON, covering all the attributes and types required by the Entity definition.

By reviewing the JSON Schema for the create operation and the fields definitions of the Entity, we could come up with a rather simple description of a toy ML Model:

{
    "name": "my-model",
    "description": "sample ML Model",
    "algorithm": "regression",
    "mlFeatures": [
        {
            "name": "age",
            "dataType": "numerical",
            "featureSources": [
                {
                    "name": "age",
                    "dataType": "integer"
                }
            ]
        },
        {
            "name": "persona",
            "dataType": "categorical",
            "featureSources": [
                {
                    "name": "age",
                    "dataType": "integer"
                },
                {
                    "name": "education",
                    "dataType": "string"
                }
            ],
            "featureAlgorithm": "PCA"
        }
    ],
    "mlHyperParameters": [
        {
            "name": "regularisation",
            "value": "0.5"
        }
    ]
}

If we needed to repeat this process with a full-fledged model that is built ad-hoc and updated during the CICD process, we would just be adding a hardly maintainable, error-prone requirement to our production deployment pipelines.

The same would happen if, inside the actual OpenMetadata code, there was not a way to easily interact with the API and make sure that we send proper data and can safely process the outputs.

As OpenMetadata is a data-centric solution, we need to make sure we have the right ingredients at all times. That is why we have developed a high-level Python API, using pydantic models automatically generated from the JSON Schemas.

OBS: If you are using a published version of the Ingestion Framework, you are already good to go, as we package the code with the metadata.generated module. If you are developing a new feature, you can get more information here.

This API wrapper helps developers and consumers in:

  • Validating data during development and with specific error messages at runtime,
  • Receiving typed responses to ease further processing.

Thanks to the recursive model setting of pydantic the example above can be rewritten using only Python classes, and thus being able to get help from IDEs and the Python interpreter. We can rewrite the previous JSON as:

from metadata.generated.schema.api.data.createMlModel import CreateMlModelRequest

from metadata.generated.schema.entity.data.mlmodel import (
    FeatureSource,
    FeatureSourceDataType,
    FeatureType,
    MlFeature,
    MlHyperParameter,
    MlModel,
)

model = CreateMlModelRequest(
    name="test-model-properties",
    algorithm="algo",
    mlFeatures=[
        MlFeature(
            name="age",
            dataType=FeatureType.numerical,
            featureSources=[
                FeatureSource(
                    name="age",
                    dataType=FeatureSourceDataType.integer,
                )
            ],
        ),
        MlFeature(
            name="persona",
            dataType=FeatureType.categorical,
            featureSources=[
                FeatureSource(
                    name="age",
                    dataType=FeatureSourceDataType.integer,
                ),
                FeatureSource(
                    name="education",
                    dataType=FeatureSourceDataType.string,
                ),
            ],
            featureAlgorithm="PCA",
        ),
    ],
    mlHyperParameters=[
        MlHyperParameter(name="regularisation", value="0.5"),
    ],
)

Now that we know how to directly use the pydantic models, we can start showcasing the solution. This module has been built with two main principles in mind:

  • Reusability: We should be able to support existing and new entities with minimum effort,
  • Extensibility: However, we are aware that not all Entities are the same. Some of them may require specific functionalities or slight variations (such as Lineage or Location), so it should be easy to identify those special methods and create new ones when needed.

To this end, we have the main class OpenMetadata (source) based on Python's TypeVar. Thanks to this we can exploit the complete power of the pydantic models, having methods with Type Parameters that know how to respond to each Entity.

At the same time, we have the Mixins (source) module, with special extensions to some Entities.

Let's use Python's API to create, update and delete a Table Entity. Choosing the Table is a nice starter, as its attributes define the following hierarchy:

DatabaseService -> Database -> Schema -> Table

This will help us showcase how we can reuse the same syntax with the three different Entities.

OpenMetadata is the class holding the connection to the API and handling the requests. We can instantiate this by passing the proper configuration to reach the server API:

from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
    OpenMetadataConnection,
)

server_config = OpenMetadataConnection(hostPort="http://localhost:8585/api")
metadata = OpenMetadata(server_config)

As this is just using a local development, the OpenMetadataConnection is rather simple. However, in there we would prepare settings such as authProvider or securityConfig.

Note

The OpenMetadataConnection is defined as a JSON Schema as well. You can check the definition here

From this point onwards, we will interact with the API by using OpenMetadata methods.

An interesting validation we can already make at this point is verifying that the service is reachable and healthy. To do so, we can validate the Bool output from:

metadata.health_check()  # `True` means we are alright :)

Following the hierarchy, we need to start by defining a DatabaseService. This will be system hosting our Database, which will contain the Table.

Recall how we have mainly two types of models:

  • Entity definitions, such as Table, MlModel or Topic.
  • API definitions, useful when running a PUT, POST or PATCH request: CreateTable, CreateMlModel or CreateTopic.

As we are just creating Entities right now, we'll stick to the pydantic models with the API definitions.

Let's imagine that we are defining a MySQL:

from metadata.generated.schema.api.services.createDatabaseService import (
    CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.databaseService import (
    DatabaseService,
    DatabaseServiceType,
    DatabaseConnection,
)

from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
    MysqlConnection,
)

create_service = CreateDatabaseServiceRequest(
    name="test-service-table",
    serviceType=DatabaseServiceType.Mysql,
    connection=DatabaseConnection(
        config=MysqlConnection(
            username="username",
            password="password",
            hostPort="http://localhost:1234",
        )
    ),
)

Note how we can use both String definitions for the attributes, as well as specific types when possible, such as serviceType=DatabaseServiceType.Mysql. The less information we need to hardcode, the better.

Another important point here is that the connection definitions are centralized as JSON Schemas. Here you can find the root of all of them.

We can review the information that will be passed to the API by visiting the JSON definition of the class we just instantiated. As all these models are powered by pydantic, this conversion is transparent to us:

create_service.json()
# '{"name": "test-service-table", "description": null, "serviceType": "Mysql", "connection": {"config": {"type": "Mysql", "scheme": "mysql+pymysql", "username": "username", "password": "**********", "hostPort": "http://localhost:1234", "database": null, "connectionOptions": null, "connectionArguments": null, "supportsMetadataExtraction": null, "supportsProfiler": null}}, "owner": null}'

Executing the actual creation is easy! As our create_service variable already holds the proper datatype, there is a single line to execute:

service_entity = metadata.create_or_update(data=create_service)

Moreover, running a create_or_update will return us the Entity type, so we can explore its attributes easily:

type(service_entity)
# metadata.generated.schema.entity.services.databaseService.DatabaseService

service_entity.json()
# '{"id": "a823952a-1fc1-46d4-bd0e-27f9812871f4", "name": "test-service-table", "displayName": null, "serviceType": "Mysql", "description": null, "connection": {"config": {"type": "Mysql", "scheme": "mysql+pymysql", "username": "username", "password": "**********", "hostPort": "http://localhost:1234", "database": null, "connectionOptions": null, "connectionArguments": null, "supportsMetadataExtraction": null, "supportsProfiler": null}}, "pipelines": null, "version": 0.1, "updatedAt": 1651237632058, "updatedBy": "anonymous", "owner": null, "href": "http://localhost:8585/api/v1/services/databaseServices/a823952a-1fc1-46d4-bd0e-27f9812871f4", "changeDescription": null, "deleted": false}'

We can now repeat the process to create a Database Entity. However, if we review the definition of the CreateDatabaseEntityRequest model...

class CreateDatabaseRequest(BaseModel):
    class Config:
        extra = Extra.forbid
    
    name: basic.EntityName = Field(
        ..., description='Name that identifies this database instance uniquely.'
    )
    description: Optional[str] = Field(
        None,
        description='Description of the database instance. What it has and how to use it.',
    )
    owner: Optional[entityReference.EntityReference] = Field(
        None, description='Owner of this database'
    )
    service: entityReference.EntityReference = Field(
     ..., description='Link to the database service where this database is hosted in'
    )
    default: Optional[bool] = Field(
        False,
        description="Some databases don't support a database/catalog in the hierarchy and use default database. For example, `MySql`. For such databases, set this flag to true to indicate that this is a default database.",
    )

Note how the only non-optional fields are name and service. The type of service, however, is EntityReference. This is expected, as there we need to pass the information of an existing Entity. In our case, the DatabaseService we just created.

Repeating the exercise and reviewing the required fields to instantiate an EntityReference we notice how we need to pass an id: uuid.UUID and type: str. Here we need to specify the id and type of our DatabaseService.

Querying by name

The id we actually saw it by printing the service_entity JSON. However, let's imagine that it did not happen, and the only information we have from the DatabaseService is its name.

To retrieve the id, we should then ask the metadata to find our Entity by its FQN:

service_query = metadata.get_by_name(entity=DatabaseService, fqn="test-service-table")

We have just used the get_by_name method. This method is the same that we will use for any Entity. This is why as an argument, we need to provide the entity field. Again, instead of relying on error-prone handwritten parameters, we can just pass the pydantic model we expect to get back. In our case, a DatabaseService.

Let's now pass the DatabaseService id to instantiate the EntityReference. We do not even need to cast it to str, as the EntityReference class expects a UUID as well:

from metadata.generated.schema.api.data.createDatabase import (
    CreateDatabaseRequest,
)
from metadata.generated.schema.type.entityReference import EntityReference

create_db = CreateDatabaseRequest(
    name="test-db",
    service=EntityReference(id=service_entity.id, type="databaseService"),
)

db_entity = metadata.create_or_update(create_db)

With the addition of the Schema Entity in 0.10, we now also need to create a Schema, which will be the one containing the Tables. As this entity is a link between other entities, an Entity Reference will be required too.

from metadata.generated.schema.api.data.createDatabaseSchema import (
    CreateDatabaseSchemaRequest,
)

create_schema = CreateDatabaseSchemaRequest(
    name="test-schema",
    database=EntityReference(id=db_entity.id, type="database")
)

schema_entity = metadata.create_or_update(data=create_schema)

# We can prepare the EntityReference that will be needed
# in the next step!
schema_reference = EntityReference(
    id=schema_entity.id, name="test-schema", type="databaseSchema"
)

Now that we have all the preparations ready, we can just reuse the same steps to create the Table:

from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.table import (
    Column,
    DataType,
    Table,
)

create_table = CreateTableRequest(
    name="test",
    databaseSchema=schema_reference,
    columns=[Column(name="id", dataType=DataType.BIGINT)],
)

table_entity = metadata.create_or_update(create_table)

Let's now update the Table by adding an owner. This will require us to create a User, and then update the Table with it. Afterwards, we will validate that the information has been properly stored.

First, make sure that no owner has been set during the creation:

print(table_entity.owner)
# None

Now, create a User:

from metadata.generated.schema.api.teams.createUser import CreateUserRequest

user = metadata.create_or_update(
    data=CreateUserRequest(name="random-user", email="random@user.com"),
)

Update our instace of create_table to add the owner field (we need to use the Create class as we'll run a PUT), and update the Entity:

create_table.owner = EntityReference(id=user.id, type="user")
updated_table_entity = metadata.create_or_update(create_table)

print(updated_table_entity.owner)
# EntityReference(id=Uuid(__root__=UUID('48793f0c-5308-45c1-9bf4-06a82c8d7bf9')), type='user', name='random-user', description=None, displayName=None, href=Href(__root__=AnyUrl('http://localhost:8585/api/v1/users/48793f0c-5308-45c1-9bf4-06a82c8d7bf9', scheme='http', host='localhost', host_type='int_domain', port='8585', path='/api/v1/users/48793f0c-5308-45c1-9bf4-06a82c8d7bf9')))

If we did not save the updated_table_entity variable and we should need to query it to review the owner field, we can run the get_by_name using the proper FQN definition for Tables:

my_table = metadata.get_by_name(entity=Table, fqn="test-service-table.test-db.test-schema.test")7. Delete the Table

Note

When querying an Entity we might not find it! The Entity could not exist, or there might be an error in the id or fullyQualifiedName.

In those cases, the get method won't fail, but instead will return None. Note that the signature of the get methods is Optional[T], so make sure to validate that there is data coming back!

Finally, we can clean up the Table by running the delete method:

metadata.delete(entity=Table, entity_id=my_table.id)

We could directly clean up the service itself with a Hard and Recursive delete. Note that this is ok for this test, but beware when working with production data!

service_id = metadata.get_by_name(
    entity=DatabaseService, fqn="test-service-table"
).id

metadata.delete(
    entity=DatabaseService,
    entity_id=service_id,
    recursive=True,
    hard_delete=True,
)

Still have questions?

You can take a look at our Q&A or reach out to us in Slack

Was this page helpful?

editSuggest edits