from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
server_config = OpenMetadataConnection(
hostPort="https://your-company.open-metadata.org/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="<YOUR-JWT-TOKEN>"
),
)
metadata = OpenMetadata(server_config)
# Simple lineage between two tables
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
description="ETL transformation",
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# With column-level lineage and SQL query
column_lineage = ColumnLineage(
fromColumns=["service.db.schema.source_table.customer_id"],
toColumn="service.db.schema.target_table.cust_id",
)
lineage_details = LineageDetails(
sqlQuery="INSERT INTO target_table SELECT customer_id FROM source_table",
columnsLineage=[column_lineage],
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
lineageDetails=lineage_details,
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# Automated SQL lineage (parses SQL to extract lineage)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
database_service = metadata.get_by_name(entity=DatabaseService, fqn="my_service")
metadata.add_lineage_by_query(
database_service=database_service,
sql="INSERT INTO target_table(id) SELECT id FROM source_table",
timeout=200,
)
{
"entity": {
"id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"type": "table",
"name": "source_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.source_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
},
"nodes": [
{
"id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"type": "table",
"name": "target_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.target_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
}
],
"upstreamEdges": [],
"downstreamEdges": [
{
"fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"lineageDetails": {
"sqlQuery": "INSERT INTO target SELECT id, name FROM source",
"columnsLineage": [
{
"fromColumns": [
"sample_data.ecommerce_db.shopify.source_table.id"
],
"toColumn": "sample_data.ecommerce_db.shopify.target_table.id"
}
]
}
}
]
}
Create a lineage edge between two entities
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
server_config = OpenMetadataConnection(
hostPort="https://your-company.open-metadata.org/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="<YOUR-JWT-TOKEN>"
),
)
metadata = OpenMetadata(server_config)
# Simple lineage between two tables
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
description="ETL transformation",
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# With column-level lineage and SQL query
column_lineage = ColumnLineage(
fromColumns=["service.db.schema.source_table.customer_id"],
toColumn="service.db.schema.target_table.cust_id",
)
lineage_details = LineageDetails(
sqlQuery="INSERT INTO target_table SELECT customer_id FROM source_table",
columnsLineage=[column_lineage],
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
lineageDetails=lineage_details,
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# Automated SQL lineage (parses SQL to extract lineage)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
database_service = metadata.get_by_name(entity=DatabaseService, fqn="my_service")
metadata.add_lineage_by_query(
database_service=database_service,
sql="INSERT INTO target_table(id) SELECT id FROM source_table",
timeout=200,
)
{
"entity": {
"id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"type": "table",
"name": "source_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.source_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
},
"nodes": [
{
"id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"type": "table",
"name": "target_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.target_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
}
],
"upstreamEdges": [],
"downstreamEdges": [
{
"fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"lineageDetails": {
"sqlQuery": "INSERT INTO target SELECT id, name FROM source",
"columnsLineage": [
{
"fromColumns": [
"sample_data.ecommerce_db.shopify.source_table.id"
],
"toColumn": "sample_data.ecommerce_db.shopify.target_table.id"
}
]
}
}
]
}
Documentation Index
Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
Use this file to discover all available pages before exploring further.
Show properties
Show properties
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
server_config = OpenMetadataConnection(
hostPort="https://your-company.open-metadata.org/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="<YOUR-JWT-TOKEN>"
),
)
metadata = OpenMetadata(server_config)
# Simple lineage between two tables
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
description="ETL transformation",
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# With column-level lineage and SQL query
column_lineage = ColumnLineage(
fromColumns=["service.db.schema.source_table.customer_id"],
toColumn="service.db.schema.target_table.cust_id",
)
lineage_details = LineageDetails(
sqlQuery="INSERT INTO target_table SELECT customer_id FROM source_table",
columnsLineage=[column_lineage],
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
lineageDetails=lineage_details,
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# Automated SQL lineage (parses SQL to extract lineage)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
database_service = metadata.get_by_name(entity=DatabaseService, fqn="my_service")
metadata.add_lineage_by_query(
database_service=database_service,
sql="INSERT INTO target_table(id) SELECT id FROM source_table",
timeout=200,
)
{
"entity": {
"id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"type": "table",
"name": "source_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.source_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
},
"nodes": [
{
"id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"type": "table",
"name": "target_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.target_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
}
],
"upstreamEdges": [],
"downstreamEdges": [
{
"fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"lineageDetails": {
"sqlQuery": "INSERT INTO target SELECT id, name FROM source",
"columnsLineage": [
{
"fromColumns": [
"sample_data.ecommerce_db.shopify.source_table.id"
],
"toColumn": "sample_data.ecommerce_db.shopify.target_table.id"
}
]
}
}
]
}
fromEntity node, including the newly created edge.
| Code | Error Type | Description |
|---|---|---|
400 | BAD_REQUEST | Invalid request body or missing required fields |
401 | UNAUTHORIZED | Invalid or missing authentication token |
403 | FORBIDDEN | User lacks permission to create lineage |
404 | NOT_FOUND | One of the referenced entities does not exist |
Was this page helpful?