Skip to main content

Usage Ingestion

Usage ingestion collects query logs from databases, parses them, aggregates usage counts per table, and publishes the results to OpenMetadata. It answers questions like “which tables are most queried?” and “who is using this table?”. Unlike metadata ingestion (Source → Sink), usage follows a 5-stage pipeline: Source → Processor → Stage → BulkSink.

Pipeline Overview

┌──────────────────────────────────────────────────────────┐
│  1. UsageSource                                          │
│  source/database/usage_source.py                         │
│                                                          │
│  Fetches raw query logs from database system views       │
│  (or from a CSV file via queryLogFilePath)               │
│                                                          │
│  Yields: TableQueries (batch of TableQuery)              │
└──────────────────────┬───────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│  2. QueryParserProcessor                                 │
│  processor/query_parser.py                               │
│                                                          │
│  Parses each SQL query to extract:                       │
│  • Referenced tables                                     │
│  • JOIN conditions (which columns join which tables)     │
│  • Query type (SELECT, INSERT, UPDATE, DELETE)           │
│                                                          │
│  Yields: QueryParserData (batch of ParsedData)           │
└──────────────────────┬───────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│  3. TableUsageStage                                      │
│  stage/table_usage.py                                    │
│                                                          │
│  Aggregates parsed queries by (table, date):             │
│  • Increments usage count                                │
│  • Collects JOIN information                             │
│  • Groups query costs by query hash                      │
│                                                          │
│  Writes: JSON Lines files to staging directory           │
└──────────────────────┬───────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│  4. MetadataUsageBulkSink                                │
│  bulksink/metadata_usage.py                              │
│                                                          │
│  Reads staged files and publishes:                       │
│  • Table usage counts                                    │
│  • Frequently joined tables                              │
│  • Query entities (linked to tables)                     │
│  • Query cost records                                    │
│  • Life cycle data (created/modified/accessed dates)     │
│  • Usage percentiles                                     │
│                                                          │
│  Calls: POST /api/v1/usage/table/{id}                    │
└──────────────────────────────────────────────────────────┘

Data Model Flow

Each stage transforms the data through a chain of models:
TableQuery                              # Raw query from database logs
  ├── query: str                        # SQL text
  ├── userName: str
  ├── startTime / endTime / duration
  ├── cost: float (optional)
  ├── databaseName / databaseSchema
  └── analysisDate: timestamp

        ▼ QueryParserProcessor
ParsedData                              # Parsed single query
  ├── tables: List[str]                 # All referenced tables
  ├── joins: Dict[table → columns]      # JOIN information
  ├── query_type: str                   # SELECT, INSERT, etc.
  ├── sql: str                          # Cleaned SQL
  └── userName, date, duration, cost

        ▼ TableUsageStage (aggregation)
TableUsageCount                         # Per table, per date
  ├── table: str
  ├── date: str
  ├── count: int                        # Number of queries
  ├── sqlQueries: List[str]
  └── joins: List[TableColumnJoin]

        ▼ MetadataUsageBulkSink
UsageRequest → POST /api/v1/usage/table/{id}
TableJoins   → POST frequently joined tables
Query entity → POST /api/v1/queries

Stage 1: Query Log Collection

UsageSource (source/database/usage_source.py) extends QueryParserSource and fetches raw query logs. Two modes:

Database Query Mode

Each database connector provides SQL to fetch from system views:
DatabaseSystem View
PostgreSQLpg_stat_statements
SnowflakeSNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
BigQueryINFORMATION_SCHEMA.JOBS
MySQLperformance_schema.events_statements_summary_by_digest
Redshiftstl_querytext + stl_query
Configuration controls:
  • queryLogDuration — how many days back to look (default: 1)
  • resultLimit — max queries to fetch (default: 100,000)
  • filterCondition — custom SQL WHERE clause

File Mode

Set queryLogFilePath to a CSV file containing query logs. The file is parsed into TableQuery objects.

Stage 2: SQL Parsing

QueryParserProcessor (processor/query_parser.py) calls parse_sql_statement() for each query.

Parsing Pipeline

parse_sql_statement(record: TableQuery)
  1. Clean the SQL (remove comments, normalize whitespace)
  2. Create LineageParser(query, dialect)
     ├── Try SqlGlot analyzer (30s timeout)
     ├── Fallback to SqlFluff analyzer (30s timeout)
     └── Fallback to SqlParse analyzer
  3. Extract: involved_tables, table_joins, query_type
  4. Format table names with get_formatted_entity_name()
  5. Return ParsedData
The LineageParser (lineage/parser.py) is shared with lineage ingestion — usage ingestion uses the table extraction capabilities, while lineage ingestion additionally uses the source/target direction analysis.

What Gets Extracted

From a query like:
SELECT o.id, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.date > '2024-01-01'
The parser extracts:
  • tables: ["orders", "customers"]
  • joins: {"orders": [{"columnName": "customer_id", "joinedWith": {"table": "customers", "column": "id"}}]}
  • query_type: "SELECT"

Stage 3: Aggregation

TableUsageStage (stage/table_usage.py) groups parsed queries and writes to temporary files.

Aggregation Logic

for each ParsedData:
    for each table in parsed.tables:
        key = (table, date)
        usage_map[key].count += 1
        usage_map[key].joins.extend(parsed.joins)
        usage_map[key].sqlQueries.append(parsed.sql)

    # Separately aggregate query costs
    cost_key = (query_hash, date)
    cost_map[cost_key].cost += parsed.cost
    cost_map[cost_key].totalDuration += parsed.duration
    cost_map[cost_key].count += 1

Staging Files

Two types of JSON Lines files are written to the staging directory:
  • Usage files: {serviceName}_{date} — one TableUsageCount per line
  • Cost files: {serviceName}_{date}_query — one QueryCostWrapper per line
This staged approach keeps memory usage bounded — records are written incrementally rather than held in memory.

Stage 4: Publishing

MetadataUsageBulkSink (bulksink/metadata_usage.py) reads the staged files and publishes to OpenMetadata.

For Each Table Usage Record

  1. Resolve table entity — look up table by FQN via ES search + API fallback
  2. Publish usage countPOST /api/v1/usage/table/{id} with UsageRequest(date, count)
  3. Publish joins — send TableJoins with column-level join information (powers “Frequently Joined With” in the UI)
  4. Create Query entities — create/link Query entities to the table
  5. Update life cycle — infer created/modified/accessed dates from query types:
    • SELECT → accessed
    • INSERT/UPDATE → modified
    • CREATE → created

After All Records

  • Compute percentilesPOST /api/v1/usage/compute.percentile/{entityType}/{date} ranks tables by usage

Query Cost Records

For databases that report cost (Snowflake, BigQuery):
  • Creates QueryCostRecord entities linked to queries by hash
  • Tracks: total cost, execution count, total duration

Life Cycle Integration

LifeCycleQueryMixin (source/database/life_cycle_query_mixin.py) infers table lifecycle from query patterns:
Query TypeLife Cycle Event
CREATE TABLECreated
INSERT, UPDATE, DELETE, MERGEModified
SELECTAccessed

Configuration

source:
  type: postgres-usage           # Database-specific usage source
  serviceName: my_postgres
  sourceConfig:
    config:
      type: DatabaseUsage
      queryLogDuration: 1        # Days to look back
      resultLimit: 100000        # Max queries to fetch
      queryLogFilePath: ""       # Optional: CSV file path
      filterCondition: ""        # Custom SQL WHERE clause

processor:
  type: query-parser
  config: {}

stage:
  type: table-usage
  config:
    filename: /tmp/usage_staging  # Staging directory

bulkSink:
  type: metadata-usage
  config:
    filename: /tmp/usage_staging  # Same staging directory

Performance

  • Query hash caching — 5,000-entry LRU cache for query lookups
  • Parser timeout — 30 seconds max per query
  • Memory limits — 100MB max per query parsing session
  • Staged processing — files written incrementally, not held in memory
  • Batch size control — configurable resultLimit per database

Key Design Patterns

PatternWhereWhy
5-stage pipelineSource → Processor → Stage → BulkSinkSeparates concerns: collection, parsing, aggregation, publishing
Staged aggregationTableUsageStage writes JSON LinesBounded memory usage for large query volumes
Shared parserLineageParser used by both usage and lineageSingle SQL parsing engine, no duplication
Cascading parsersSqlGlot → SqlFluff → SqlParseMaximize parsing success across SQL dialects

Key Files Quick Reference

What you want to doStart here
Understand the usage workflowworkflow/usage.py
See how query logs are fetchedsource/database/usage_source.py
See database-specific query SQLsource/database/{dialect}/usage.py
Read the SQL parserprocessor/query_parser.pyparse_sql_statement()
Understand aggregation logicstage/table_usage.py
See how results are publishedbulksink/metadata_usage.py
Read the underlying SQL parserlineage/parser.pyLineageParser
See usage API methodsometa/mixins/table_mixin.pypublish_table_usage()
See life cycle inferencesource/database/life_cycle_query_mixin.py
All paths above are relative to ingestion/src/metadata/ingestion/. For example, processor/query_parser.py means ingestion/src/metadata/ingestion/processor/query_parser.py.