Usage Ingestion
Usage ingestion collects query logs from databases, parses them, aggregates usage counts per table, and publishes the results to OpenMetadata. It answers questions like “which tables are most queried?” and “who is using this table?”.
Unlike metadata ingestion (Source → Sink), usage follows a 5-stage pipeline: Source → Processor → Stage → BulkSink.
Pipeline Overview
┌──────────────────────────────────────────────────────────┐
│ 1. UsageSource │
│ source/database/usage_source.py │
│ │
│ Fetches raw query logs from database system views │
│ (or from a CSV file via queryLogFilePath) │
│ │
│ Yields: TableQueries (batch of TableQuery) │
└──────────────────────┬───────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 2. QueryParserProcessor │
│ processor/query_parser.py │
│ │
│ Parses each SQL query to extract: │
│ • Referenced tables │
│ • JOIN conditions (which columns join which tables) │
│ • Query type (SELECT, INSERT, UPDATE, DELETE) │
│ │
│ Yields: QueryParserData (batch of ParsedData) │
└──────────────────────┬───────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 3. TableUsageStage │
│ stage/table_usage.py │
│ │
│ Aggregates parsed queries by (table, date): │
│ • Increments usage count │
│ • Collects JOIN information │
│ • Groups query costs by query hash │
│ │
│ Writes: JSON Lines files to staging directory │
└──────────────────────┬───────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 4. MetadataUsageBulkSink │
│ bulksink/metadata_usage.py │
│ │
│ Reads staged files and publishes: │
│ • Table usage counts │
│ • Frequently joined tables │
│ • Query entities (linked to tables) │
│ • Query cost records │
│ • Life cycle data (created/modified/accessed dates) │
│ • Usage percentiles │
│ │
│ Calls: POST /api/v1/usage/table/{id} │
└──────────────────────────────────────────────────────────┘
Data Model Flow
Each stage transforms the data through a chain of models:
TableQuery # Raw query from database logs
├── query: str # SQL text
├── userName: str
├── startTime / endTime / duration
├── cost: float (optional)
├── databaseName / databaseSchema
└── analysisDate: timestamp
│
▼ QueryParserProcessor
ParsedData # Parsed single query
├── tables: List[str] # All referenced tables
├── joins: Dict[table → columns] # JOIN information
├── query_type: str # SELECT, INSERT, etc.
├── sql: str # Cleaned SQL
└── userName, date, duration, cost
│
▼ TableUsageStage (aggregation)
TableUsageCount # Per table, per date
├── table: str
├── date: str
├── count: int # Number of queries
├── sqlQueries: List[str]
└── joins: List[TableColumnJoin]
│
▼ MetadataUsageBulkSink
UsageRequest → POST /api/v1/usage/table/{id}
TableJoins → POST frequently joined tables
Query entity → POST /api/v1/queries
Stage 1: Query Log Collection
UsageSource (source/database/usage_source.py) extends QueryParserSource and fetches raw query logs. Two modes:
Database Query Mode
Each database connector provides SQL to fetch from system views:
| Database | System View |
|---|
| PostgreSQL | pg_stat_statements |
| Snowflake | SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY |
| BigQuery | INFORMATION_SCHEMA.JOBS |
| MySQL | performance_schema.events_statements_summary_by_digest |
| Redshift | stl_querytext + stl_query |
Configuration controls:
queryLogDuration — how many days back to look (default: 1)
resultLimit — max queries to fetch (default: 100,000)
filterCondition — custom SQL WHERE clause
File Mode
Set queryLogFilePath to a CSV file containing query logs. The file is parsed into TableQuery objects.
Stage 2: SQL Parsing
QueryParserProcessor (processor/query_parser.py) calls parse_sql_statement() for each query.
Parsing Pipeline
parse_sql_statement(record: TableQuery)
1. Clean the SQL (remove comments, normalize whitespace)
2. Create LineageParser(query, dialect)
├── Try SqlGlot analyzer (30s timeout)
├── Fallback to SqlFluff analyzer (30s timeout)
└── Fallback to SqlParse analyzer
3. Extract: involved_tables, table_joins, query_type
4. Format table names with get_formatted_entity_name()
5. Return ParsedData
The LineageParser (lineage/parser.py) is shared with lineage ingestion — usage ingestion uses the table extraction capabilities, while lineage ingestion additionally uses the source/target direction analysis.
From a query like:
SELECT o.id, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.date > '2024-01-01'
The parser extracts:
- tables:
["orders", "customers"]
- joins:
{"orders": [{"columnName": "customer_id", "joinedWith": {"table": "customers", "column": "id"}}]}
- query_type:
"SELECT"
Stage 3: Aggregation
TableUsageStage (stage/table_usage.py) groups parsed queries and writes to temporary files.
Aggregation Logic
for each ParsedData:
for each table in parsed.tables:
key = (table, date)
usage_map[key].count += 1
usage_map[key].joins.extend(parsed.joins)
usage_map[key].sqlQueries.append(parsed.sql)
# Separately aggregate query costs
cost_key = (query_hash, date)
cost_map[cost_key].cost += parsed.cost
cost_map[cost_key].totalDuration += parsed.duration
cost_map[cost_key].count += 1
Staging Files
Two types of JSON Lines files are written to the staging directory:
- Usage files:
{serviceName}_{date} — one TableUsageCount per line
- Cost files:
{serviceName}_{date}_query — one QueryCostWrapper per line
This staged approach keeps memory usage bounded — records are written incrementally rather than held in memory.
Stage 4: Publishing
MetadataUsageBulkSink (bulksink/metadata_usage.py) reads the staged files and publishes to OpenMetadata.
For Each Table Usage Record
- Resolve table entity — look up table by FQN via ES search + API fallback
- Publish usage count —
POST /api/v1/usage/table/{id} with UsageRequest(date, count)
- Publish joins — send
TableJoins with column-level join information (powers “Frequently Joined With” in the UI)
- Create Query entities — create/link
Query entities to the table
- Update life cycle — infer created/modified/accessed dates from query types:
SELECT → accessed
INSERT/UPDATE → modified
CREATE → created
After All Records
- Compute percentiles —
POST /api/v1/usage/compute.percentile/{entityType}/{date} ranks tables by usage
Query Cost Records
For databases that report cost (Snowflake, BigQuery):
- Creates
QueryCostRecord entities linked to queries by hash
- Tracks: total cost, execution count, total duration
Life Cycle Integration
LifeCycleQueryMixin (source/database/life_cycle_query_mixin.py) infers table lifecycle from query patterns:
| Query Type | Life Cycle Event |
|---|
CREATE TABLE | Created |
INSERT, UPDATE, DELETE, MERGE | Modified |
SELECT | Accessed |
Configuration
source:
type: postgres-usage # Database-specific usage source
serviceName: my_postgres
sourceConfig:
config:
type: DatabaseUsage
queryLogDuration: 1 # Days to look back
resultLimit: 100000 # Max queries to fetch
queryLogFilePath: "" # Optional: CSV file path
filterCondition: "" # Custom SQL WHERE clause
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: /tmp/usage_staging # Staging directory
bulkSink:
type: metadata-usage
config:
filename: /tmp/usage_staging # Same staging directory
- Query hash caching — 5,000-entry LRU cache for query lookups
- Parser timeout — 30 seconds max per query
- Memory limits — 100MB max per query parsing session
- Staged processing — files written incrementally, not held in memory
- Batch size control — configurable
resultLimit per database
Key Design Patterns
| Pattern | Where | Why |
|---|
| 5-stage pipeline | Source → Processor → Stage → BulkSink | Separates concerns: collection, parsing, aggregation, publishing |
| Staged aggregation | TableUsageStage writes JSON Lines | Bounded memory usage for large query volumes |
| Shared parser | LineageParser used by both usage and lineage | Single SQL parsing engine, no duplication |
| Cascading parsers | SqlGlot → SqlFluff → SqlParse | Maximize parsing success across SQL dialects |
Key Files Quick Reference
| What you want to do | Start here |
|---|
| Understand the usage workflow | workflow/usage.py |
| See how query logs are fetched | source/database/usage_source.py |
| See database-specific query SQL | source/database/{dialect}/usage.py |
| Read the SQL parser | processor/query_parser.py → parse_sql_statement() |
| Understand aggregation logic | stage/table_usage.py |
| See how results are published | bulksink/metadata_usage.py |
| Read the underlying SQL parser | lineage/parser.py → LineageParser |
| See usage API methods | ometa/mixins/table_mixin.py → publish_table_usage() |
| See life cycle inference | source/database/life_cycle_query_mixin.py |
All paths above are relative to ingestion/src/metadata/ingestion/. For example, processor/query_parser.py means ingestion/src/metadata/ingestion/processor/query_parser.py.