from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
server_config = OpenMetadataConnection(
hostPort="https://your-company.open-metadata.org/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="<YOUR-JWT-TOKEN>"
),
)
metadata = OpenMetadata(server_config)
# Simple lineage between two tables
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
description="ETL transformation",
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# With column-level lineage and SQL query
column_lineage = ColumnLineage(
fromColumns=["service.db.schema.source_table.customer_id"],
toColumn="service.db.schema.target_table.cust_id",
)
lineage_details = LineageDetails(
sqlQuery="INSERT INTO target_table SELECT customer_id FROM source_table",
columnsLineage=[column_lineage],
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
lineageDetails=lineage_details,
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# Automated SQL lineage (parses SQL to extract lineage)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
database_service = metadata.get_by_name(entity=DatabaseService, fqn="my_service")
metadata.add_lineage_by_query(
database_service=database_service,
sql="INSERT INTO target_table(id) SELECT id FROM source_table",
timeout=200,
)
{
"entity": {
"id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"type": "table",
"name": "source_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.source_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
},
"nodes": [
{
"id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"type": "table",
"name": "target_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.target_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
}
],
"upstreamEdges": [],
"downstreamEdges": [
{
"fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"lineageDetails": {
"sqlQuery": "INSERT INTO target SELECT id, name FROM source",
"columnsLineage": [
{
"fromColumns": [
"sample_data.ecommerce_db.shopify.source_table.id"
],
"toColumn": "sample_data.ecommerce_db.shopify.target_table.id"
}
]
}
}
]
}
Create a lineage edge between two entities
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
server_config = OpenMetadataConnection(
hostPort="https://your-company.open-metadata.org/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="<YOUR-JWT-TOKEN>"
),
)
metadata = OpenMetadata(server_config)
# Simple lineage between two tables
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
description="ETL transformation",
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# With column-level lineage and SQL query
column_lineage = ColumnLineage(
fromColumns=["service.db.schema.source_table.customer_id"],
toColumn="service.db.schema.target_table.cust_id",
)
lineage_details = LineageDetails(
sqlQuery="INSERT INTO target_table SELECT customer_id FROM source_table",
columnsLineage=[column_lineage],
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
lineageDetails=lineage_details,
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# Automated SQL lineage (parses SQL to extract lineage)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
database_service = metadata.get_by_name(entity=DatabaseService, fqn="my_service")
metadata.add_lineage_by_query(
database_service=database_service,
sql="INSERT INTO target_table(id) SELECT id FROM source_table",
timeout=200,
)
{
"entity": {
"id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"type": "table",
"name": "source_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.source_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
},
"nodes": [
{
"id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"type": "table",
"name": "target_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.target_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
}
],
"upstreamEdges": [],
"downstreamEdges": [
{
"fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"lineageDetails": {
"sqlQuery": "INSERT INTO target SELECT id, name FROM source",
"columnsLineage": [
{
"fromColumns": [
"sample_data.ecommerce_db.shopify.source_table.id"
],
"toColumn": "sample_data.ecommerce_db.shopify.target_table.id"
}
]
}
}
]
}
Show properties
Show properties
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
server_config = OpenMetadataConnection(
hostPort="https://your-company.open-metadata.org/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="<YOUR-JWT-TOKEN>"
),
)
metadata = OpenMetadata(server_config)
# Simple lineage between two tables
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
description="ETL transformation",
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# With column-level lineage and SQL query
column_lineage = ColumnLineage(
fromColumns=["service.db.schema.source_table.customer_id"],
toColumn="service.db.schema.target_table.cust_id",
)
lineage_details = LineageDetails(
sqlQuery="INSERT INTO target_table SELECT customer_id FROM source_table",
columnsLineage=[column_lineage],
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
lineageDetails=lineage_details,
),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
# Automated SQL lineage (parses SQL to extract lineage)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
database_service = metadata.get_by_name(entity=DatabaseService, fqn="my_service")
metadata.add_lineage_by_query(
database_service=database_service,
sql="INSERT INTO target_table(id) SELECT id FROM source_table",
timeout=200,
)
{
"entity": {
"id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"type": "table",
"name": "source_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.source_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
},
"nodes": [
{
"id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"type": "table",
"name": "target_table",
"fullyQualifiedName": "sample_data.ecommerce_db.shopify.target_table",
"deleted": false,
"href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
}
],
"upstreamEdges": [],
"downstreamEdges": [
{
"fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
"toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e",
"lineageDetails": {
"sqlQuery": "INSERT INTO target SELECT id, name FROM source",
"columnsLineage": [
{
"fromColumns": [
"sample_data.ecommerce_db.shopify.source_table.id"
],
"toColumn": "sample_data.ecommerce_db.shopify.target_table.id"
}
]
}
}
]
}
fromEntity node, including the newly created edge.
| Code | Error Type | Description |
|---|---|---|
400 | BAD_REQUEST | Invalid request body or missing required fields |
401 | UNAUTHORIZED | Invalid or missing authentication token |
403 | FORBIDDEN | User lacks permission to create lineage |
404 | NOT_FOUND | One of the referenced entities does not exist |
Was this page helpful?