> ## Documentation Index
> Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Usage Ingestion | Technical Architecture

> Technical deep dive into the OpenMetadata usage ingestion pipeline — query log collection, SQL parsing, aggregation, and publishing.

# Usage Ingestion

Usage ingestion collects **query logs** from databases, parses them, aggregates usage counts per table, and publishes the results to OpenMetadata. It answers questions like "which tables are most queried?" and "who is using this table?".

Unlike metadata ingestion (Source → Sink), usage follows a **5-stage pipeline**: Source → Processor → Stage → BulkSink.

## Pipeline Overview

```
┌──────────────────────────────────────────────────────────┐
│  1. UsageSource                                          │
│  source/database/usage_source.py                         │
│                                                          │
│  Fetches raw query logs from database system views       │
│  (or from a CSV file via queryLogFilePath)               │
│                                                          │
│  Yields: TableQueries (batch of TableQuery)              │
└──────────────────────┬───────────────────────────────────┘
                       │
                       ▼
┌──────────────────────────────────────────────────────────┐
│  2. QueryParserProcessor                                 │
│  processor/query_parser.py                               │
│                                                          │
│  Parses each SQL query to extract:                       │
│  • Referenced tables                                     │
│  • JOIN conditions (which columns join which tables)     │
│  • Query type (SELECT, INSERT, UPDATE, DELETE)           │
│                                                          │
│  Yields: QueryParserData (batch of ParsedData)           │
└──────────────────────┬───────────────────────────────────┘
                       │
                       ▼
┌──────────────────────────────────────────────────────────┐
│  3. TableUsageStage                                      │
│  stage/table_usage.py                                    │
│                                                          │
│  Aggregates parsed queries by (table, date):             │
│  • Increments usage count                                │
│  • Collects JOIN information                             │
│  • Groups query costs by query hash                      │
│                                                          │
│  Writes: JSON Lines files to staging directory           │
└──────────────────────┬───────────────────────────────────┘
                       │
                       ▼
┌──────────────────────────────────────────────────────────┐
│  4. MetadataUsageBulkSink                                │
│  bulksink/metadata_usage.py                              │
│                                                          │
│  Reads staged files and publishes:                       │
│  • Table usage counts                                    │
│  • Frequently joined tables                              │
│  • Query entities (linked to tables)                     │
│  • Query cost records                                    │
│  • Life cycle data (created/modified/accessed dates)     │
│  • Usage percentiles                                     │
│                                                          │
│  Calls: POST /api/v1/usage/table/{id}                    │
└──────────────────────────────────────────────────────────┘
```

## Data Model Flow

Each stage transforms the data through a chain of models:

```
TableQuery                              # Raw query from database logs
  ├── query: str                        # SQL text
  ├── userName: str
  ├── startTime / endTime / duration
  ├── cost: float (optional)
  ├── databaseName / databaseSchema
  └── analysisDate: timestamp
        │
        ▼ QueryParserProcessor
ParsedData                              # Parsed single query
  ├── tables: List[str]                 # All referenced tables
  ├── joins: Dict[table → columns]      # JOIN information
  ├── query_type: str                   # SELECT, INSERT, etc.
  ├── sql: str                          # Cleaned SQL
  └── userName, date, duration, cost
        │
        ▼ TableUsageStage (aggregation)
TableUsageCount                         # Per table, per date
  ├── table: str
  ├── date: str
  ├── count: int                        # Number of queries
  ├── sqlQueries: List[str]
  └── joins: List[TableColumnJoin]
        │
        ▼ MetadataUsageBulkSink
UsageRequest → POST /api/v1/usage/table/{id}
TableJoins   → POST frequently joined tables
Query entity → POST /api/v1/queries
```

## Stage 1: Query Log Collection

`UsageSource` (`source/database/usage_source.py`) extends `QueryParserSource` and fetches raw query logs. Two modes:

### Database Query Mode

Each database connector provides SQL to fetch from system views:

| Database   | System View                                              |
| ---------- | -------------------------------------------------------- |
| PostgreSQL | `pg_stat_statements`                                     |
| Snowflake  | `SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY`                  |
| BigQuery   | `INFORMATION_SCHEMA.JOBS`                                |
| MySQL      | `performance_schema.events_statements_summary_by_digest` |
| Redshift   | `stl_querytext` + `stl_query`                            |

Configuration controls:

* `queryLogDuration` — how many days back to look (default: 1)
* `resultLimit` — max queries to fetch (default: 100,000)
* `filterCondition` — custom SQL WHERE clause

### File Mode

Set `queryLogFilePath` to a CSV file containing query logs. The file is parsed into `TableQuery` objects.

## Stage 2: SQL Parsing

`QueryParserProcessor` (`processor/query_parser.py`) calls `parse_sql_statement()` for each query.

### Parsing Pipeline

```python theme={null}
parse_sql_statement(record: TableQuery)
  1. Clean the SQL (remove comments, normalize whitespace)
  2. Create LineageParser(query, dialect)
     ├── Try SqlGlot analyzer (30s timeout)
     ├── Fallback to SqlFluff analyzer (30s timeout)
     └── Fallback to SqlParse analyzer
  3. Extract: involved_tables, table_joins, query_type
  4. Format table names with get_formatted_entity_name()
  5. Return ParsedData
```

The `LineageParser` (`lineage/parser.py`) is shared with [lineage ingestion](/v1.12.x/developers/contribute/codebase-deep-dives/lineage-ingestion) — usage ingestion uses the **table extraction** capabilities, while lineage ingestion additionally uses the **source/target direction** analysis.

### What Gets Extracted

From a query like:

```sql theme={null}
SELECT o.id, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.date > '2024-01-01'
```

The parser extracts:

* **tables**: `["orders", "customers"]`
* **joins**: `{"orders": [{"columnName": "customer_id", "joinedWith": {"table": "customers", "column": "id"}}]}`
* **query\_type**: `"SELECT"`

## Stage 3: Aggregation

`TableUsageStage` (`stage/table_usage.py`) groups parsed queries and writes to temporary files.

### Aggregation Logic

```python theme={null}
for each ParsedData:
    for each table in parsed.tables:
        key = (table, date)
        usage_map[key].count += 1
        usage_map[key].joins.extend(parsed.joins)
        usage_map[key].sqlQueries.append(parsed.sql)

    # Separately aggregate query costs
    cost_key = (query_hash, date)
    cost_map[cost_key].cost += parsed.cost
    cost_map[cost_key].totalDuration += parsed.duration
    cost_map[cost_key].count += 1
```

### Staging Files

Two types of JSON Lines files are written to the staging directory:

* **Usage files**: `{serviceName}_{date}` — one `TableUsageCount` per line
* **Cost files**: `{serviceName}_{date}_query` — one `QueryCostWrapper` per line

This staged approach keeps memory usage bounded — records are written incrementally rather than held in memory.

## Stage 4: Publishing

`MetadataUsageBulkSink` (`bulksink/metadata_usage.py`) reads the staged files and publishes to OpenMetadata.

### For Each Table Usage Record

1. **Resolve table entity** — look up table by FQN via ES search + API fallback
2. **Publish usage count** — `POST /api/v1/usage/table/{id}` with `UsageRequest(date, count)`
3. **Publish joins** — send `TableJoins` with column-level join information (powers "Frequently Joined With" in the UI)
4. **Create Query entities** — create/link `Query` entities to the table
5. **Update life cycle** — infer created/modified/accessed dates from query types:
   * `SELECT` → accessed
   * `INSERT`/`UPDATE` → modified
   * `CREATE` → created

### After All Records

* **Compute percentiles** — `POST /api/v1/usage/compute.percentile/{entityType}/{date}` ranks tables by usage

### Query Cost Records

For databases that report cost (Snowflake, BigQuery):

* Creates `QueryCostRecord` entities linked to queries by hash
* Tracks: total cost, execution count, total duration

## Life Cycle Integration

`LifeCycleQueryMixin` (`source/database/life_cycle_query_mixin.py`) infers table lifecycle from query patterns:

| Query Type                            | Life Cycle Event |
| ------------------------------------- | ---------------- |
| `CREATE TABLE`                        | Created          |
| `INSERT`, `UPDATE`, `DELETE`, `MERGE` | Modified         |
| `SELECT`                              | Accessed         |

## Configuration

```yaml theme={null}
source:
  type: postgres-usage           # Database-specific usage source
  serviceName: my_postgres
  sourceConfig:
    config:
      type: DatabaseUsage
      queryLogDuration: 1        # Days to look back
      resultLimit: 100000        # Max queries to fetch
      queryLogFilePath: ""       # Optional: CSV file path
      filterCondition: ""        # Custom SQL WHERE clause

processor:
  type: query-parser
  config: {}

stage:
  type: table-usage
  config:
    filename: /tmp/usage_staging  # Staging directory

bulkSink:
  type: metadata-usage
  config:
    filename: /tmp/usage_staging  # Same staging directory
```

## Performance

* **Query hash caching** — 5,000-entry LRU cache for query lookups
* **Parser timeout** — 30 seconds max per query
* **Memory limits** — 100MB max per query parsing session
* **Staged processing** — files written incrementally, not held in memory
* **Batch size control** — configurable `resultLimit` per database

## Key Design Patterns

| Pattern                | Where                                          | Why                                                              |
| ---------------------- | ---------------------------------------------- | ---------------------------------------------------------------- |
| **5-stage pipeline**   | Source → Processor → Stage → BulkSink          | Separates concerns: collection, parsing, aggregation, publishing |
| **Staged aggregation** | `TableUsageStage` writes JSON Lines            | Bounded memory usage for large query volumes                     |
| **Shared parser**      | `LineageParser` used by both usage and lineage | Single SQL parsing engine, no duplication                        |
| **Cascading parsers**  | SqlGlot → SqlFluff → SqlParse                  | Maximize parsing success across SQL dialects                     |

## Key Files Quick Reference

| What you want to do             | Start here                                              |
| ------------------------------- | ------------------------------------------------------- |
| Understand the usage workflow   | `workflow/usage.py`                                     |
| See how query logs are fetched  | `source/database/usage_source.py`                       |
| See database-specific query SQL | `source/database/{dialect}/usage.py`                    |
| Read the SQL parser             | `processor/query_parser.py` → `parse_sql_statement()`   |
| Understand aggregation logic    | `stage/table_usage.py`                                  |
| See how results are published   | `bulksink/metadata_usage.py`                            |
| Read the underlying SQL parser  | `lineage/parser.py` → `LineageParser`                   |
| See usage API methods           | `ometa/mixins/table_mixin.py` → `publish_table_usage()` |
| See life cycle inference        | `source/database/life_cycle_query_mixin.py`             |

<Tip>
  All paths above are relative to `ingestion/src/metadata/ingestion/`. For example, `processor/query_parser.py` means `ingestion/src/metadata/ingestion/processor/query_parser.py`.
</Tip>
