Lineage Ingestion
Lineage ingestion extracts data flow relationships from SQL queries — which tables feed into which other tables, and at what column granularity. It parses query logs, view definitions, and stored procedures to build a lineage graph published to OpenMetadata.
Pipeline Overview
Lineage uses the same query log sources as usage ingestion, but processes queries differently — it analyzes directional data flow (source → target) rather than counting references.
┌──────────────────────────────────────────────────────────┐
│ 1. LineageSource │
│ source/database/lineage_source.py │
│ │
│ Fetches query logs filtered for lineage-relevant types: │
│ CREATE TABLE AS SELECT, INSERT...SELECT, MERGE, │
│ UPDATE...FROM, view definitions, stored procedures │
│ │
│ Yields: Either[AddLineageRequest] │
└──────────────────────┬───────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ MetadataRestSink │
│ sink/metadata_rest.py │
│ │
│ PUT /api/v1/lineage │
│ (merges column lineage with existing edges) │
└──────────────────────────────────────────────────────────┘
Unlike the 5-stage usage pipeline, lineage processing happens inside the source — the LineageSource parses queries, builds the lineage graph, resolves entities, and yields ready-to-publish AddLineageRequest objects.
# 1. SOURCE fetches and chunks query logs
LineageSource._iter()
└── yield_query_lineage()
├── Fetch query logs from database (or file)
├── Chunk into batches of 200
└── Process chunks in parallel threads
# 2. PROCESSOR parses each query (runs in thread pool)
└── query_lineage_processor(query) # lineage_processors.py
└── get_lineage_by_query(query, ...) # lineage/sql_lineage.py
├── LineageParser(query, dialect) # lineage/parser.py
│ ├── Try SqlGlot (30s timeout)
│ ├── Fallback SqlFluff (30s timeout)
│ └── Fallback SqlParse
│
├── Extract: source_tables, target_tables, intermediate_tables
├── Extract: column_lineage (column-to-column mappings)
│
└── For each (source, target) pair:
├── search_table_entities(source) # ES + API lookup
├── search_table_entities(target)
├── Build column lineage mappings
└── yield AddLineageRequest
# 3. VIEW LINEAGE (separate pass)
└── yield_view_lineage()
└── view_lineage_processor(view)
└── get_view_lineage(view_definition, ...)
├── Parse CREATE VIEW ... AS SELECT ...
├── Extract source tables from SELECT
└── yield AddLineageRequest (source tables → view)
SQL Parsing
LineageParser
LineageParser (lineage/parser.py) is the core SQL analysis engine shared with usage ingestion. For lineage, it extracts directional information:
parser = LineageParser(query="INSERT INTO target SELECT * FROM source", dialect=Dialect.ANSI)
parser.source_tables # ["source"] — tables being read
parser.target_tables # ["target"] — tables being written
parser.intermediate_tables # [] — staging/temp tables
parser.column_lineage # [(source.col1, target.col1), ...]
parser.table_aliases # {"s": "source"}
Cascade Parsing Strategy
Three parsers are tried in order, with 30-second timeouts and 100MB memory limits each:
- SqlGlot — preferred for accuracy, handles most dialects
- SqlFluff — fallback, good for complex SQL
- SqlParse — final fallback, always succeeds (less accurate)
Query Cleaning
Before parsing, queries are cleaned:
- Remove
COPY GRANTS (Snowflake)
- Remove
MERGE...WHEN MATCHED clauses (too complex for parsers)
- Filter out
CREATE TRIGGER/FUNCTION/PROCEDURE (no lineage value)
- Normalize escape sequences
Query Masking
QueryMasker (lineage/masker.py) replaces literal values with ? before storing queries in lineage details — prevents sensitive data leakage while preserving query structure.
Lineage-Relevant Query Types
Not all queries produce lineage. Each database connector filters for specific query types:
| Query Pattern | Lineage Produced |
|---|
CREATE TABLE AS SELECT ... | source tables → new table |
INSERT INTO ... SELECT ... | source tables → target table |
UPDATE ... FROM ... | source tables → target table |
MERGE INTO ... USING ... | source table → target table |
CREATE VIEW AS ... | source tables → view |
| Stored procedure body | varies (parsed recursively) |
Queries like plain SELECT, DROP, or DDL without data movement are filtered out.
Graph-Based Lineage Analysis
lineage/sql_lineage.py uses NetworkX directed graphs to handle complex lineage scenarios.
Direct Lineage
For simple queries (INSERT INTO target SELECT FROM source):
A single AddLineageRequest is created.
For queries involving temp/staging tables:
CREATE TEMP TABLE staging AS SELECT * FROM source;
INSERT INTO target SELECT * FROM staging;
The graph captures:
source ──→ staging ──→ target
get_lineage_by_graph() traces paths through the graph:
- Find weakly connected components
- Extract root-to-leaf paths (max depth: 20)
- Create lineage request for each hop
Column-Level Lineage
For each (source, target) edge, the parser maps individual columns:
column_lineage = [
ColumnLineage(
fromColumns=["service.db.schema.source.col1"],
toColumn="service.db.schema.target.col1"
),
ColumnLineage(
fromColumns=["service.db.schema.source.col2"],
toColumn="service.db.schema.target.col2"
),
]
Handles:
- Simple column mappings (
col1 → col1)
- Renamed columns (
source.old_name → target.new_name)
- Star selects (
* → individual columns)
- Expressions (
source.a + source.b → target.sum_ab)
- Intermediate column mappings through staging tables
Entity Resolution
Before publishing, table names from SQL must be resolved to OpenMetadata entities:
search_table_entities(service_name, database, schema, table)
1. Search Elasticsearch (fast) for table FQN
2. If not found, try API search (reliable)
3. If schema not found, retry without schema qualifier
4. Cache results (1,000-entry LRU cache)
This resolution step is critical — lineage can only be created between entities that exist in the catalog.
Cross-Database Lineage
When processCrossDatabaseLineage is enabled:
# Query references tables from multiple services:
INSERT INTO warehouse.analytics.summary
SELECT * FROM production.sales.orders
# Resolution searches across multiple services:
search_table_entities(
service_names=["warehouse_service", "production_service"],
...
)
Configured via crossDatabaseServiceNames in the workflow config.
Stored Procedure Lineage
lineage_processors.py handles lineage from stored procedures:
- Fetch stored procedure definitions
- Extract individual SQL statements from the procedure body
- Filter for lineage-relevant statements (
is_lineage_query())
- Build a directed graph of the procedure’s data flow
- For each lineage edge, link to the stored procedure entity as the pipeline reference
The resulting lineage includes pipeline=EntityReference(type="storedProcedure") to show which procedure creates the data flow.
View Lineage
Views are processed separately:
view_lineage_processor(view_entity)
1. Get view definition SQL
2. Parse with LineageParser
3. Extract source tables from the SELECT
4. Create lineage: source tables → view
5. Override existing view lineage (views can be redefined)
External Table Lineage
ExternalTableLineageMixin (source/database/external_table_lineage_mixin.py) links external tables (S3, GCS paths) to database tables:
- Search for container entities by storage path
- Map columns between container and table
- Source:
LineageSource.ExternalTableLineage
Parallel Processing
LineageSource processes queries in parallel for performance:
Query logs → Chunk into batches of 200
│
┌──────────────┼──────────────┐
▼ ▼ ▼
Thread 1 Thread 2 Thread 3
(chunk 1) (chunk 2) (chunk 3)
│ │ │
└──────────────┼──────────────┘
▼
Result queue → yield AddLineageRequest
Thread count is configurable (default: CPU count). Each thread gets its own parser instances.
AddLineageRequest Structure
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=source_id, type="table"),
toEntity=EntityReference(id=target_id, type="table"),
lineageDetails=LineageDetails(
sqlQuery="INSERT INTO target SELECT ...", # Masked
source=LineageSource.QueryLineage,
columnsLineage=[
ColumnLineage(
fromColumns=["db.schema.source.col1"],
toColumn="db.schema.target.col1"
),
],
pipeline=EntityReference(...) # Optional: stored procedure
)
)
)
Publishing
The sink merges lineage with existing edges:
add_lineage(request, check_patch=False)
1. Check if edge already exists
2. If exists: merge column lineage (union of old + new columns)
3. If new: create edge
4. Cache the result
For views, write_override_lineage() replaces existing lineage entirely (since view definitions can change).
Dialect Mapping
lineage/models.py maps each database connection type to a SQL dialect for correct parsing:
| Connection Type | Dialect |
|---|
| Postgres, Redshift | POSTGRES |
| MySQL, MariaDB | MYSQL |
| Snowflake | SNOWFLAKE |
| BigQuery | BIGQUERY |
| SQL Server, Azure SQL | TSQL |
| Hive, Spark, Databricks | SPARKSQL |
| Oracle | ORACLE |
| Trino, Presto | TRINO |
| + 20 more | … |
Configuration
source:
type: postgres-lineage # Database-specific lineage source
serviceName: my_postgres
sourceConfig:
config:
type: DatabaseLineage
queryLogDuration: 1 # Days to look back
resultLimit: 100000 # Max queries to fetch
parsingTimeoutLimit: 30 # Seconds per query
processCrossDatabaseLineage: false
crossDatabaseServiceNames: []
threads: 4 # Parallel processing threads
queryLogFilePath: "" # Optional: CSV file path
Key Design Patterns
| Pattern | Where | Why |
|---|
| Directed graph | NetworkX in sql_lineage.py | Handle intermediate tables and complex data flows |
| Cascade parsing | SqlGlot → SqlFluff → SqlParse | Maximize success across SQL dialects |
| Entity resolution | ES + API fallback with LRU cache | Fast lookup with reliability guarantee |
| Chunked parallelism | Thread pool with 200-query chunks | Bounded memory + CPU utilization |
| Query masking | QueryMasker before storage | Prevent sensitive data leakage |
Key Files Quick Reference
| What you want to do | Start here |
|---|
| Understand the lineage workflow | workflow/lineage.py |
| See how query logs are fetched and filtered | source/database/lineage_source.py |
| See database-specific filters | source/database/{dialect}/lineage.py |
| Read the SQL parser | lineage/parser.py → LineageParser |
| Understand graph-based analysis | lineage/sql_lineage.py → get_lineage_by_query() |
| See query masking | lineage/masker.py |
| See dialect mappings | lineage/models.py |
| See stored procedure processing | source/database/lineage_processors.py |
| See view lineage | source/database/lineage_processors.py → view_lineage_processor() |
| See external table lineage | source/database/external_table_lineage_mixin.py |
| See how lineage is published | ometa/mixins/lineage_mixin.py → add_lineage() |
| See lineage sink logic | sink/metadata_rest.py → write_lineage() |
All paths above are relative to ingestion/src/metadata/ingestion/. For example, lineage/parser.py means ingestion/src/metadata/ingestion/lineage/parser.py.