Skip to main content

Lineage Ingestion

Lineage ingestion extracts data flow relationships from SQL queries — which tables feed into which other tables, and at what column granularity. It parses query logs, view definitions, and stored procedures to build a lineage graph published to OpenMetadata.

Pipeline Overview

Lineage uses the same query log sources as usage ingestion, but processes queries differently — it analyzes directional data flow (source → target) rather than counting references.
┌──────────────────────────────────────────────────────────┐
│  1. LineageSource                                        │
│  source/database/lineage_source.py                       │
│                                                          │
│  Fetches query logs filtered for lineage-relevant types: │
│  CREATE TABLE AS SELECT, INSERT...SELECT, MERGE,         │
│  UPDATE...FROM, view definitions, stored procedures      │
│                                                          │
│  Yields: Either[AddLineageRequest]                       │
└──────────────────────┬───────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│  MetadataRestSink                                        │
│  sink/metadata_rest.py                                   │
│                                                          │
│  PUT /api/v1/lineage                                     │
│  (merges column lineage with existing edges)             │
└──────────────────────────────────────────────────────────┘
Unlike the 5-stage usage pipeline, lineage processing happens inside the source — the LineageSource parses queries, builds the lineage graph, resolves entities, and yields ready-to-publish AddLineageRequest objects.

Lineage Extraction Call Chain

# 1. SOURCE fetches and chunks query logs
LineageSource._iter()
    └── yield_query_lineage()
        ├── Fetch query logs from database (or file)
        ├── Chunk into batches of 200
        └── Process chunks in parallel threads

# 2. PROCESSOR parses each query (runs in thread pool)
    └── query_lineage_processor(query)           # lineage_processors.py
        └── get_lineage_by_query(query, ...)     # lineage/sql_lineage.py
            ├── LineageParser(query, dialect)     # lineage/parser.py
            │   ├── Try SqlGlot (30s timeout)
            │   ├── Fallback SqlFluff (30s timeout)
            │   └── Fallback SqlParse

            ├── Extract: source_tables, target_tables, intermediate_tables
            ├── Extract: column_lineage (column-to-column mappings)

            └── For each (source, target) pair:
                ├── search_table_entities(source)  # ES + API lookup
                ├── search_table_entities(target)
                ├── Build column lineage mappings
                └── yield AddLineageRequest

# 3. VIEW LINEAGE (separate pass)
    └── yield_view_lineage()
        └── view_lineage_processor(view)
            └── get_view_lineage(view_definition, ...)
                ├── Parse CREATE VIEW ... AS SELECT ...
                ├── Extract source tables from SELECT
                └── yield AddLineageRequest (source tables → view)

SQL Parsing

LineageParser

LineageParser (lineage/parser.py) is the core SQL analysis engine shared with usage ingestion. For lineage, it extracts directional information:
parser = LineageParser(query="INSERT INTO target SELECT * FROM source", dialect=Dialect.ANSI)

parser.source_tables         # ["source"]       — tables being read
parser.target_tables         # ["target"]       — tables being written
parser.intermediate_tables   # []               — staging/temp tables
parser.column_lineage        # [(source.col1, target.col1), ...]
parser.table_aliases         # {"s": "source"}

Cascade Parsing Strategy

Three parsers are tried in order, with 30-second timeouts and 100MB memory limits each:
  1. SqlGlot — preferred for accuracy, handles most dialects
  2. SqlFluff — fallback, good for complex SQL
  3. SqlParse — final fallback, always succeeds (less accurate)

Query Cleaning

Before parsing, queries are cleaned:
  • Remove COPY GRANTS (Snowflake)
  • Remove MERGE...WHEN MATCHED clauses (too complex for parsers)
  • Filter out CREATE TRIGGER/FUNCTION/PROCEDURE (no lineage value)
  • Normalize escape sequences

Query Masking

QueryMasker (lineage/masker.py) replaces literal values with ? before storing queries in lineage details — prevents sensitive data leakage while preserving query structure.

Lineage-Relevant Query Types

Not all queries produce lineage. Each database connector filters for specific query types:
Query PatternLineage Produced
CREATE TABLE AS SELECT ...source tables → new table
INSERT INTO ... SELECT ...source tables → target table
UPDATE ... FROM ...source tables → target table
MERGE INTO ... USING ...source table → target table
CREATE VIEW AS ...source tables → view
Stored procedure bodyvaries (parsed recursively)
Queries like plain SELECT, DROP, or DDL without data movement are filtered out.

Graph-Based Lineage Analysis

lineage/sql_lineage.py uses NetworkX directed graphs to handle complex lineage scenarios.

Direct Lineage

For simple queries (INSERT INTO target SELECT FROM source):
source ──→ target
A single AddLineageRequest is created.

Intermediate Table Lineage

For queries involving temp/staging tables:
CREATE TEMP TABLE staging AS SELECT * FROM source;
INSERT INTO target SELECT * FROM staging;
The graph captures:
source ──→ staging ──→ target
get_lineage_by_graph() traces paths through the graph:
  1. Find weakly connected components
  2. Extract root-to-leaf paths (max depth: 20)
  3. Create lineage request for each hop

Column-Level Lineage

For each (source, target) edge, the parser maps individual columns:
column_lineage = [
    ColumnLineage(
        fromColumns=["service.db.schema.source.col1"],
        toColumn="service.db.schema.target.col1"
    ),
    ColumnLineage(
        fromColumns=["service.db.schema.source.col2"],
        toColumn="service.db.schema.target.col2"
    ),
]
Handles:
  • Simple column mappings (col1 → col1)
  • Renamed columns (source.old_name → target.new_name)
  • Star selects (* → individual columns)
  • Expressions (source.a + source.b → target.sum_ab)
  • Intermediate column mappings through staging tables

Entity Resolution

Before publishing, table names from SQL must be resolved to OpenMetadata entities:
search_table_entities(service_name, database, schema, table)
  1. Search Elasticsearch (fast) for table FQN
  2. If not found, try API search (reliable)
  3. If schema not found, retry without schema qualifier
  4. Cache results (1,000-entry LRU cache)
This resolution step is critical — lineage can only be created between entities that exist in the catalog.

Cross-Database Lineage

When processCrossDatabaseLineage is enabled:
# Query references tables from multiple services:
INSERT INTO warehouse.analytics.summary
SELECT * FROM production.sales.orders

# Resolution searches across multiple services:
search_table_entities(
    service_names=["warehouse_service", "production_service"],
    ...
)
Configured via crossDatabaseServiceNames in the workflow config.

Stored Procedure Lineage

lineage_processors.py handles lineage from stored procedures:
  1. Fetch stored procedure definitions
  2. Extract individual SQL statements from the procedure body
  3. Filter for lineage-relevant statements (is_lineage_query())
  4. Build a directed graph of the procedure’s data flow
  5. For each lineage edge, link to the stored procedure entity as the pipeline reference
The resulting lineage includes pipeline=EntityReference(type="storedProcedure") to show which procedure creates the data flow.

View Lineage

Views are processed separately:
view_lineage_processor(view_entity)
  1. Get view definition SQL
  2. Parse with LineageParser
  3. Extract source tables from the SELECT
  4. Create lineage: source tables → view
  5. Override existing view lineage (views can be redefined)

External Table Lineage

ExternalTableLineageMixin (source/database/external_table_lineage_mixin.py) links external tables (S3, GCS paths) to database tables:
  1. Search for container entities by storage path
  2. Map columns between container and table
  3. Source: LineageSource.ExternalTableLineage

Parallel Processing

LineageSource processes queries in parallel for performance:
Query logs → Chunk into batches of 200

     ┌──────────────┼──────────────┐
     ▼              ▼              ▼
  Thread 1       Thread 2       Thread 3
  (chunk 1)      (chunk 2)      (chunk 3)
     │              │              │
     └──────────────┼──────────────┘

            Result queue → yield AddLineageRequest
Thread count is configurable (default: CPU count). Each thread gets its own parser instances.

AddLineageRequest Structure

AddLineageRequest(
    edge=EntitiesEdge(
        fromEntity=EntityReference(id=source_id, type="table"),
        toEntity=EntityReference(id=target_id, type="table"),
        lineageDetails=LineageDetails(
            sqlQuery="INSERT INTO target SELECT ...",  # Masked
            source=LineageSource.QueryLineage,
            columnsLineage=[
                ColumnLineage(
                    fromColumns=["db.schema.source.col1"],
                    toColumn="db.schema.target.col1"
                ),
            ],
            pipeline=EntityReference(...)  # Optional: stored procedure
        )
    )
)

Publishing

The sink merges lineage with existing edges:
add_lineage(request, check_patch=False)
  1. Check if edge already exists
  2. If exists: merge column lineage (union of old + new columns)
  3. If new: create edge
  4. Cache the result
For views, write_override_lineage() replaces existing lineage entirely (since view definitions can change).

Dialect Mapping

lineage/models.py maps each database connection type to a SQL dialect for correct parsing:
Connection TypeDialect
Postgres, RedshiftPOSTGRES
MySQL, MariaDBMYSQL
SnowflakeSNOWFLAKE
BigQueryBIGQUERY
SQL Server, Azure SQLTSQL
Hive, Spark, DatabricksSPARKSQL
OracleORACLE
Trino, PrestoTRINO
+ 20 more

Configuration

source:
  type: postgres-lineage          # Database-specific lineage source
  serviceName: my_postgres
  sourceConfig:
    config:
      type: DatabaseLineage
      queryLogDuration: 1           # Days to look back
      resultLimit: 100000           # Max queries to fetch
      parsingTimeoutLimit: 30       # Seconds per query
      processCrossDatabaseLineage: false
      crossDatabaseServiceNames: []
      threads: 4                    # Parallel processing threads
      queryLogFilePath: ""          # Optional: CSV file path

Key Design Patterns

PatternWhereWhy
Directed graphNetworkX in sql_lineage.pyHandle intermediate tables and complex data flows
Cascade parsingSqlGlot → SqlFluff → SqlParseMaximize success across SQL dialects
Entity resolutionES + API fallback with LRU cacheFast lookup with reliability guarantee
Chunked parallelismThread pool with 200-query chunksBounded memory + CPU utilization
Query maskingQueryMasker before storagePrevent sensitive data leakage

Key Files Quick Reference

What you want to doStart here
Understand the lineage workflowworkflow/lineage.py
See how query logs are fetched and filteredsource/database/lineage_source.py
See database-specific filterssource/database/{dialect}/lineage.py
Read the SQL parserlineage/parser.pyLineageParser
Understand graph-based analysislineage/sql_lineage.pyget_lineage_by_query()
See query maskinglineage/masker.py
See dialect mappingslineage/models.py
See stored procedure processingsource/database/lineage_processors.py
See view lineagesource/database/lineage_processors.pyview_lineage_processor()
See external table lineagesource/database/external_table_lineage_mixin.py
See how lineage is publishedometa/mixins/lineage_mixin.pyadd_lineage()
See lineage sink logicsink/metadata_rest.pywrite_lineage()
All paths above are relative to ingestion/src/metadata/ingestion/. For example, lineage/parser.py means ingestion/src/metadata/ingestion/lineage/parser.py.