Java SDK for Lineage
The Java SDK provides a fluent API for managing data lineage in OpenMetadata. You can query lineage graphs, create edges between entities with column-level mapping, delete edges, export lineage, and run impact analysis.
The fluent entry point is the Lineage class in org.openmetadata.sdk.api.
Lineage in OpenMetadata is a relationship between any two entities. While the examples below use tables and dashboards,
the same API works with any entity type (pipelines, topics, ML models, etc.).
Setup
Before using the fluent API, initialize the client:
import org.openmetadata.sdk.api.Lineage;
import org.openmetadata.sdk.client.OpenMetadataClient;
OpenMetadataClient client = OpenMetadataClient.builder()
.baseUrl("http://localhost:8585/api")
.jwtToken("<YOUR-JWT-TOKEN>")
.build();
Lineage.setDefaultClient(client);
Querying Lineage
Retrieve the lineage graph for an entity by type and ID:
// Get lineage with default depth (1 upstream, 1 downstream)
Lineage.LineageGraph graph = Lineage.of("table", tableId).fetch();
// Get the raw JSON response
String raw = graph.getRaw();
Custom Depth
Control how many hops upstream and downstream are returned:
Lineage.LineageGraph graph = Lineage.of("table", tableId)
.upstream(3)
.downstream(2)
.fetch();
Or set both directions to the same value:
Lineage.LineageGraph graph = Lineage.of("table", tableId)
.depth(5)
.fetch();
Include Deleted Entities
Lineage.LineageGraph graph = Lineage.of("table", tableId)
.includeDeleted(true)
.fetch();
Adding Lineage
Use Lineage.connect() to create an edge between two entities:
Lineage.LineageEdge edge = Lineage.connect()
.from("table", sourceTableId)
.to("dashboard", dashboardId)
.withDescription("Dashboard uses data from this table")
.execute();
Column-Level Lineage
Map specific columns from the source entity to columns in the target entity:
Lineage.LineageEdge edge = Lineage.connect()
.from("table", sourceTableId)
.fromColumns("customer_id", "order_date")
.to("table", targetTableId)
.toColumns("cust_id", "date")
.withDescription("ETL transformation")
.execute();
Each target column in toColumns will be mapped to all columns in fromColumns. The resulting payload includes
a columnsLineage array inside lineageDetails.
Pipeline Reference
Associate a pipeline entity that powers the transformation:
Lineage.LineageEdge edge = Lineage.connect()
.from("table", sourceTableId)
.to("table", targetTableId)
.withPipeline("pipeline", pipelineId)
.withDescription("Daily ETL job")
.execute();
SQL Query
Attach the SQL query driving the lineage:
Lineage.LineageEdge edge = Lineage.connect()
.from("table", sourceTableId)
.to("table", targetTableId)
.withSqlQuery("INSERT INTO target SELECT id, name FROM source")
.execute();
Full Example
Combine all options:
Lineage.LineageEdge edge = Lineage.connect()
.from("table", sourceTableId)
.fromColumns("customer_id", "order_date")
.to("table", targetTableId)
.toColumns("cust_id", "date")
.withPipeline("pipeline", pipelineId)
.withDescription("ETL transformation")
.withSqlQuery("SELECT customer_id, order_date FROM source")
.execute();
Deleting Lineage
Remove a lineage edge between two entities:
Lineage.disconnect()
.from("table", sourceTableId)
.to("dashboard", dashboardId)
.confirm();
Exporting Lineage
Export the lineage graph for a given entity by its fully qualified name:
String csv = Lineage.export()
.entity("table", "service.database.schema.my_table")
.upstream(3)
.downstream(2)
.execute();
Impact Analysis
Analyze which downstream (or upstream) entities are affected by changes to a given entity:
Downstream Impact
Lineage.ImpactAnalysis impact = Lineage.impact()
.of("table", tableId)
.downstream()
.depth(3)
.analyze();
int totalImpacted = impact.getTotalImpactCount();
Upstream Impact
Lineage.ImpactAnalysis impact = Lineage.impact()
.of("dashboard", dashboardId)
.upstream()
.depth(5)
.analyze();
OpenLineage Events
The SDK also supports posting OpenLineage standard events from tools like Spark or Airflow
via the OpenLineage fluent API:
import org.openmetadata.sdk.fluent.OpenLineage;
OpenLineage.setDefaultClient(client);
// Post a single run event
String response = OpenLineage.event()
.withEventType("COMPLETE")
.withEventTime("2024-01-15T12:00:00Z")
.withJob("my-etl-job", "my-namespace")
.withRun("run-id-123")
.addInput("source_table", "my-namespace")
.addOutput("target_table", "my-namespace")
.send();
Batch Events
// Build individual events
OpenLineage.RunEventBuilder event1 = OpenLineage.event()
.withEventType("START")
.withEventTime("2024-01-15T12:00:00Z")
.withJob("job-1", "ns")
.withRun("run-1");
OpenLineage.RunEventBuilder event2 = OpenLineage.event()
.withEventType("COMPLETE")
.withEventTime("2024-01-15T12:05:00Z")
.withJob("job-1", "ns")
.withRun("run-1");
// Send as a batch
String response = OpenLineage.batch()
.addEvent(event1)
.addEvent(event2)
.send();
Static API
For simpler use cases, the LineageAPI class in org.openmetadata.sdk.api provides static methods
that wrap the underlying HTTP calls:
import org.openmetadata.sdk.api.LineageAPI;
// Get lineage by entity FQN
String lineage = LineageAPI.getLineage("service.db.schema.table");
// Get lineage with depth
String lineage = LineageAPI.getLineage("service.db.schema.table", "3", "2");
// Get lineage by entity type and ID
String lineage = LineageAPI.getEntityLineage("table", tableId);
// Add lineage (pass a Map matching the AddLineage JSON schema)
String result = LineageAPI.addLineage(lineageRequestMap);
// Delete lineage
String result = LineageAPI.deleteLineage("table:fromId", "dashboard:toId");
// Export lineage
String csv = LineageAPI.exportLineage("service.db.schema.table", "table", "3", "2");
Async Variants
All static methods have CompletableFuture-based async variants:
CompletableFuture<String> future = LineageAPI.getLineageAsync("service.db.schema.table");
future.thenAccept(lineage -> System.out.println("Lineage: " + lineage));
CompletableFuture<String> addFuture = LineageAPI.addLineageAsync(request);
CompletableFuture<String> deleteFuture = LineageAPI.deleteLineageAsync("table:id1", "dashboard:id2");