> ## Documentation Index
> Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Lineage Ingestion Advanced | Entity Resolution, Publishing & Configuration

> Deep dive into entity resolution, cross-database lineage, stored procedures, parallel processing, and the full AddLineageRequest lifecycle.

# Lineage Ingestion — Advanced Topics

This page covers entity resolution, cross-database lineage, stored procedure and view lineage, parallel processing, publishing, dialect mapping, and configuration. For the pipeline overview, SQL parsing, and graph analysis, see [Lineage Ingestion](/v1.12.x/developers/contribute/codebase-deep-dives/lineage-ingestion).

## Entity Resolution

Before publishing, table names from SQL must be resolved to OpenMetadata entities:

```python theme={null}
search_table_entities(service_name, database, schema, table)
  1. Search Elasticsearch (fast) for table FQN
  2. If not found, try API search (reliable)
  3. If schema not found, retry without schema qualifier
  4. Cache results (1,000-entry LRU cache)
```

This resolution step is critical — lineage can only be created between entities that exist in the catalog.

## Cross-Database Lineage

When `processCrossDatabaseLineage` is enabled:

```python theme={null}
# Query references tables from multiple services:
INSERT INTO warehouse.analytics.summary
SELECT * FROM production.sales.orders

# Resolution searches across multiple services:
search_table_entities(
    service_names=["warehouse_service", "production_service"],
    ...
)
```

Configured via `crossDatabaseServiceNames` in the workflow config.

## Stored Procedure Lineage

`lineage_processors.py` handles lineage from stored procedures:

1. Fetch stored procedure definitions
2. Extract individual SQL statements from the procedure body
3. Filter for lineage-relevant statements (`is_lineage_query()`)
4. Build a directed graph of the procedure's data flow
5. For each lineage edge, link to the stored procedure entity as the pipeline reference

The resulting lineage includes `pipeline=EntityReference(type="storedProcedure")` to show which procedure creates the data flow.

## View Lineage

Views are processed separately:

```python theme={null}
view_lineage_processor(view_entity)
  1. Get view definition SQL
  2. Parse with LineageParser
  3. Extract source tables from the SELECT
  4. Create lineage: source tables → view
  5. Override existing view lineage (views can be redefined)
```

## External Table Lineage

`ExternalTableLineageMixin` (`source/database/external_table_lineage_mixin.py`) links external tables (S3, GCS paths) to database tables:

1. Search for container entities by storage path
2. Map columns between container and table
3. Source: `LineageSource.ExternalTableLineage`

## Parallel Processing

`LineageSource` processes queries in parallel for performance:

```
Query logs → Chunk into batches of 200
                    │
     ┌──────────────┼──────────────┐
     ▼              ▼              ▼
  Thread 1       Thread 2       Thread 3
  (chunk 1)      (chunk 2)      (chunk 3)
     │              │              │
     └──────────────┼──────────────┘
                    ▼
            Result queue → yield AddLineageRequest
```

Thread count is configurable (default: CPU count). Each thread gets its own parser instances.

## AddLineageRequest Structure

```python theme={null}
AddLineageRequest(
    edge=EntitiesEdge(
        fromEntity=EntityReference(id=source_id, type="table"),
        toEntity=EntityReference(id=target_id, type="table"),
        lineageDetails=LineageDetails(
            sqlQuery="INSERT INTO target SELECT ...",  # Masked
            source=LineageSource.QueryLineage,
            columnsLineage=[
                ColumnLineage(
                    fromColumns=["db.schema.source.col1"],
                    toColumn="db.schema.target.col1"
                ),
            ],
            pipeline=EntityReference(...)  # Optional: stored procedure
        )
    )
)
```

## Publishing

The sink merges lineage with existing edges:

```python theme={null}
add_lineage(request, check_patch=False)
  1. Check if edge already exists
  2. If exists: merge column lineage (union of old + new columns)
  3. If new: create edge
  4. Cache the result
```

For views, `write_override_lineage()` replaces existing lineage entirely (since view definitions can change).

## Dialect Mapping

`lineage/models.py` maps each database connection type to a SQL dialect for correct parsing:

| Connection Type         | Dialect   |
| ----------------------- | --------- |
| Postgres, Redshift      | POSTGRES  |
| MySQL, MariaDB          | MYSQL     |
| Snowflake               | SNOWFLAKE |
| BigQuery                | BIGQUERY  |
| SQL Server, Azure SQL   | TSQL      |
| Hive, Spark, Databricks | SPARKSQL  |
| Oracle                  | ORACLE    |
| Trino, Presto           | TRINO     |
| + 20 more               | ...       |

## Configuration

```yaml theme={null}
source:
  type: postgres-lineage          # Database-specific lineage source
  serviceName: my_postgres
  sourceConfig:
    config:
      type: DatabaseLineage
      queryLogDuration: 1           # Days to look back
      resultLimit: 100000           # Max queries to fetch
      parsingTimeoutLimit: 30       # Seconds per query
      processCrossDatabaseLineage: false
      crossDatabaseServiceNames: []
      threads: 4                    # Parallel processing threads
      queryLogFilePath: ""          # Optional: CSV file path
```

## Key Design Patterns

| Pattern                 | Where                             | Why                                               |
| ----------------------- | --------------------------------- | ------------------------------------------------- |
| **Directed graph**      | NetworkX in `sql_lineage.py`      | Handle intermediate tables and complex data flows |
| **Cascade parsing**     | SqlGlot → SqlFluff → SqlParse     | Maximize success across SQL dialects              |
| **Entity resolution**   | ES + API fallback with LRU cache  | Fast lookup with reliability guarantee            |
| **Chunked parallelism** | Thread pool with 200-query chunks | Bounded memory + CPU utilization                  |
| **Query masking**       | `QueryMasker` before storage      | Prevent sensitive data leakage                    |

## Key Files Quick Reference

| What you want to do                         | Start here                                                           |
| ------------------------------------------- | -------------------------------------------------------------------- |
| Understand the lineage workflow             | `workflow/lineage.py`                                                |
| See how query logs are fetched and filtered | `source/database/lineage_source.py`                                  |
| See database-specific filters               | `source/database/{dialect}/lineage.py`                               |
| Read the SQL parser                         | `lineage/parser.py` → `LineageParser`                                |
| Understand graph-based analysis             | `lineage/sql_lineage.py` → `get_lineage_by_query()`                  |
| See query masking                           | `lineage/masker.py`                                                  |
| See dialect mappings                        | `lineage/models.py`                                                  |
| See stored procedure processing             | `source/database/lineage_processors.py`                              |
| See view lineage                            | `source/database/lineage_processors.py` → `view_lineage_processor()` |
| See external table lineage                  | `source/database/external_table_lineage_mixin.py`                    |
| See how lineage is published                | `ometa/mixins/lineage_mixin.py` → `add_lineage()`                    |
| See lineage sink logic                      | `sink/metadata_rest.py` → `write_lineage()`                          |

<Tip>
  All paths above are relative to `ingestion/src/metadata/ingestion/`. For example, `lineage/parser.py` means `ingestion/src/metadata/ingestion/lineage/parser.py`.
</Tip>
