Profiler Module
The profiler module lives at ingestion/src/metadata/profiler/. It computes statistical metrics on database tables (row counts, null ratios, histograms, etc.) and publishes the results back to the OpenMetadata server.
This guide walks through the architecture top-down: from the workflow trigger, through the metrics system, down to how individual metrics are computed and published.
Directory Layout
profiler/
├── api/
│ └── models.py # DTOs: ProfilerResponse, ProfilerProcessorConfig
├── config.py # Config helpers for schema/database profiler settings
├── factory.py # Abstract factory for profiler interfaces
├── registry.py # MetricRegistry, TypeRegistry enums
├── metrics/ # All metric definitions
│ ├── core.py # Base classes: Metric, StaticMetric, QueryMetric, etc.
│ ├── registry.py # Metrics enum — the central metric catalog
│ ├── static/ # SQL aggregate metrics (COUNT, SUM, MIN, MAX, ...)
│ ├── composed/ # Derived metrics (NullRatio, DuplicateCount, IQR, ...)
│ ├── window/ # Percentile metrics (Median, Quartiles)
│ ├── hybrid/ # Metrics combining queries + prior results (Histogram)
│ ├── system/ # Database system metrics (DML ops, freshness)
│ └── pandas_metric_protocol.py # Accumulator pattern for DataFrame metrics
├── interface/ # Database abstraction layer
│ ├── profiler_interface.py # Abstract base
│ └── sqlalchemy/ # SQL implementations (+ dialect overrides)
├── processor/ # Core profiling logic
│ ├── core.py # Profiler class — orchestrates metric computation
│ ├── processor.py # ProfilerProcessor — ingestion framework step
│ ├── runner.py # QueryRunner / PandasRunner
│ ├── metric_filter.py # Selects metrics by config + column type
│ └── models.py # Internal processor models
├── source/ # Profiler data sources
│ ├── metadata.py # OpenMetadataSource — fetches tables from OM API
│ ├── fetcher/ # Entity fetching strategies
│ └── database/ # Database-specific profiler sources
│ └── base/
│ ├── profiler_source.py # Creates the profiler runner
│ └── profiler_resolver.py # Resolves sampler + interface by engine
├── orm/ # ORM utilities
│ ├── registry.py # Type registries, dialect maps, type classifiers
│ ├── converter/ # DB-specific type converters
│ └── functions/ # Custom SQL functions (SumFn, LenFn, etc.)
└── adaptors/ # Adaptors for non-SQL sources (NoSQL, etc.)
Workflow Pipeline
The profiler follows the standard Source → Processor → Sink pattern defined in workflow/profiler.py:
┌──────────────────────────────────────────────────────────┐
│ OpenMetadataSource │
│ profiler/source/metadata.py │
│ │
│ Fetches from OM API: │
│ • Tables matching service/database/schema filters │
│ • Global profiler configuration │
│ • Database service connection │
│ │
│ Yields: ProfilerSourceAndEntity │
└──────────────────────┬───────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ ProfilerProcessor │
│ profiler/processor/processor.py │
│ │
│ For each table: │
│ 1. Create profiler runner (Profiler instance) │
│ 2. Execute all metrics │
│ 3. Build CreateTableProfileRequest │
│ │
│ Yields: ProfilerResponse │
└──────────────────────┬───────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ MetadataRestSink │
│ ingestion/sink/metadata_rest.py │
│ │
│ POST /api/v1/tables/{id}/tableProfile │
└──────────────────────────────────────────────────────────┘
Metrics System
The metrics system is the heart of the profiler. Every metric is a class that knows how to compute itself against a data source.
Metric Type Hierarchy
Metric (ABC) # metrics/core.py
│
├── StaticMetric # SQL aggregate functions
│ fn(col, ...) → SQLAlchemy expression # e.g., func.count(), func.avg()
│ Examples: Count, NullCount, Min, Max, Mean, Sum, StdDev,
│ DistinctCount, UniqueCount, MinLength, MaxLength
│
├── QueryMetric # Full query execution
│ query(col, ...) → SQLAlchemy Select # Returns multiple rows
│ Examples: LikeCount, RegexCount
│
├── ComposedMetric # Derived from other metrics
│ fn(results) → value # Pure computation, no DB query
│ Examples: NullRatio, DistinctRatio, UniqueRatio,
│ DuplicateCount, InterQuartileRange
│
├── HybridMetric # Query + prior results
│ query(col, ...) → Select # Needs results from earlier metrics
│ fn(results) → value
│ Examples: Histogram, CardinalityDistribution
│
├── WindowMetric # Percentile-based (window functions)
│ fn(col, ...) → SQLAlchemy expression
│ Examples: Median, FirstQuartile, ThirdQuartile
│
├── CustomMetric # User-defined SQL
│ sql(col, ...) → raw SQL string
│
└── SystemMetric # Database system-level
│ sql(col, ...) → dialect-specific query
│ Examples: DML operation counts, freshness
Metric Registry
All built-in metrics are registered in the Metrics enum (metrics/registry.py):
class Metrics(MetricRegistry):
ROW_COUNT = RowCount # Table-level
COLUMN_COUNT = ColumnCount
COUNT = Count # Column-level
NULL_COUNT = NullCount
DISTINCT_COUNT = DistinctCount
MEAN = Mean
# ... 40+ metrics
The MetricRegistry base class makes each enum member callable — Metrics.ROW_COUNT() instantiates a RowCount metric.
Metric Computation Order
Metrics are computed in a strict dependency order:
1. Static Metrics (SQL aggregates — independent)
2. Query Metrics (full queries — independent)
3. Window Metrics (percentiles — independent)
4. System Metrics (database system queries — independent)
5. Custom Metrics (user SQL — independent)
│
▼ results available
6. Composed Metrics (derived from 1-5, e.g., NullRatio = NullCount / Count)
│
▼ composed results available
7. Hybrid Metrics (need both query + prior results, e.g., Histogram)
Steps 1–5 run in parallel via a thread pool. Steps 6–7 run sequentially per column after the parallel phase completes.
Profiler Execution Call Chain
# 1. PROCESSOR receives a table to profile
ProfilerProcessor._run(ProfilerSourceAndEntity)
└── profiler_source.get_profiler_runner(entity, config)
# source/database/base/profiler_source.py
# Resolves the correct ProfilerInterface and SamplerInterface
# Returns a Profiler instance
# 2. PROFILER orchestrates computation
└── Profiler.process() # processor/core.py
├── compute_metrics()
│ ├── profile_entity()
│ │ ├── _prepare_table_metrics() → ThreadPoolMetrics[]
│ │ ├── _prepare_column_metrics() → ThreadPoolMetrics[]
│ │ ├── _prepare_system_metrics() → ThreadPoolMetrics[]
│ │ └── interface.get_all_metrics(all_metrics)
│ │
│ │ # 3. INTERFACE executes in thread pool
│ │ ├── ThreadPoolExecutor(max_workers=5..20)
│ │ ├── For each ThreadPoolMetrics:
│ │ │ └── compute_metrics_in_thread()
│ │ │ ├── Create thread-local QueryRunner
│ │ │ └── Dispatch by metric type:
│ │ │ ├── _compute_table_metrics()
│ │ │ ├── _compute_static_metrics()
│ │ │ ├── _compute_query_metrics()
│ │ │ ├── _compute_window_metrics()
│ │ │ ├── _compute_custom_metrics()
│ │ │ └── _compute_system_metrics()
│ │ └── Aggregate results → {table: {}, columns: {}, system: []}
│ │
│ ├── For each column:
│ │ ├── run_composed_metrics(col) # Uses static results
│ │ └── run_hybrid_metrics(col) # Uses composed results
│ └── return self
│
└── get_profile() # Transform to API model
└── return ProfilerResponse(table, CreateTableProfileRequest)
Threading Model
The profiler uses a thread pool to parallelize metric computation across columns:
┌─────────────────────────────────────────────────┐
│ ThreadPoolExecutor (5–20 threads) │
│ │
│ Thread 1: table metrics (RowCount, ColCount) │
│ Thread 2: column "id" static metrics │
│ Thread 3: column "name" static metrics │
│ Thread 4: column "email" static metrics │
│ Thread 5: system metrics │
│ ... │
└─────────────────────────────────────────────────┘
│
▼ all threads complete
┌─────────────────────────────────────────────────┐
│ Sequential: composed → hybrid per column │
└─────────────────────────────────────────────────┘
Each thread gets its own QueryRunner instance (_create_thread_safe_runner()) with a dedicated database session. Thread count scales dynamically based on the number of tasks (min 5, max 20).
Metric Filtering
Not all metrics apply to all columns. MetricFilter (processor/metric_filter.py) selects metrics based on:
- Column data type — numeric metrics skip string columns (uses
orm/registry.py classifiers like is_quantifiable(), is_concatenable())
- Global profiler config — admin can enable/disable specific metrics
- Table-level config — per-table metric overrides
- Database service type — some metrics only work on specific databases
# orm/registry.py type classifiers
is_quantifiable(col_type) # → numeric metrics (Mean, StdDev, Sum, ...)
is_concatenable(col_type) # → string metrics (MinLength, MaxLength, ...)
is_date_time(col_type) # → temporal metrics
NOT_COMPUTE # → set of types to skip entirely (ARRAY, JSON, ...)
ORM Layer
The orm/ directory bridges SQLAlchemy types with OpenMetadata’s type system:
registry.py — PythonDialects maps service types to SQLAlchemy dialects; CustomTypes handles special types (UUID, BYTES, ARRAY)
converter/ — database-specific converters (BigQuery, Snowflake, etc.) that map SQLAlchemy column types to OpenMetadata column types
functions/ — custom SQL functions used by metrics (e.g., SumFn, LenFn) that handle dialect differences
Database Backend Pluggability
Each database can override the default profiler behavior:
interface/sqlalchemy/
├── profiler_interface.py # SQAProfilerInterface (base SQL implementation)
├── bigquery/profiler_interface.py # BigQuery overrides
├── snowflake/profiler_interface.py # Snowflake overrides
├── databricks/profiler_interface.py # Databricks overrides
├── trino/profiler_interface.py # Trino overrides
└── ...
System metrics are also dialect-specific:
metrics/system/
├── system.py # SystemMetricsRegistry (auto-discovers impls)
├── bigquery/system.py # BigQuery: INFORMATION_SCHEMA queries
├── snowflake/system.py # Snowflake: ACCOUNT_USAGE queries
├── redshift/system.py # Redshift: STL_INSERT, STL_DELETE queries
└── databricks/system.py # Databricks: table history queries
Profiler Configuration
The profiler is configured via ProfilerProcessorConfig (api/models.py):
ProfilerProcessorConfig:
profiler: ProfilerDef # Which metrics to run, timeout
tableConfig: List[TableConfig] # Per-table overrides (columns, metrics)
schemaConfig: List[...] # Schema-level filters
databaseConfig: List[...] # Database-level filters
ProfilerDef:
name: str
timeout_seconds: int # Per-table timeout
metrics: List[ValidMetric] # Validated against Metrics registry
Adding a New Metric
Choose the metric type
Decide which base class fits your metric:| If your metric… | Extend |
|---|
| Is a SQL aggregate (one value per column) | StaticMetric |
| Needs a full query (multiple rows) | QueryMetric |
| Is derived from other metrics (no DB query) | ComposedMetric |
| Needs both a query and prior metric results | HybridMetric |
| Uses window functions (percentiles) | WindowMetric |
Implement the metric class
Create a file in the appropriate metrics/ subdirectory.For a StaticMetric, implement fn() returning a SQLAlchemy expression:from metadata.profiler.metrics.core import StaticMetric, _label
class MyMetric(StaticMetric):
@classmethod
def name(cls):
return "myMetric"
@classmethod
def is_col_metric(cls):
return True
@_label
def fn(self, col, ...):
return func.my_aggregate(col)
For a ComposedMetric, implement fn() using prior results:class MyRatio(ComposedMetric):
@classmethod
def required_metrics(cls):
return [MyMetric, Count] # Must run before this metric
@classmethod
def fn(cls, res):
my_val = res.get(MyMetric.name())
count = res.get(Count.name())
return my_val / count if count else None
Register in the Metrics enum
Add your metric to the Metrics enum in metrics/registry.py:class Metrics(MetricRegistry):
# ... existing metrics
MY_METRIC = MyMetric
Add type filtering (if needed)
If your metric only applies to certain column types, update MetricFilter in processor/metric_filter.py to filter appropriately.
Pandas / DataFrame Support
For non-SQL sources (datalakes, files), the profiler uses a Pandas-based interface with an accumulator pattern (metrics/pandas_metric_protocol.py):
class PandasComputation(Protocol[T, R]):
def create_accumulator(self) -> T # Init empty container
def update_accumulator(self, acc: T, df: DataFrame) -> T # Process chunk
def aggregate_accumulator(self, acc: T) -> R # Final result
This three-step pattern enables streaming computation over large datasets that don’t fit in memory — each DataFrame chunk updates the accumulator, and the final result is computed after all chunks are processed.
Key Design Patterns
| Pattern | Where | Why |
|---|
| Registry | Metrics enum, SystemMetricsRegistry, TypeRegistry | Central catalog with callable instantiation |
| Thread Pool | SQAProfilerInterface.get_all_metrics() | Parallelize metric computation across columns |
| Decorator | @add_props(table=table) | Dynamically attach table/column context to metric instances |
| Accumulator | PandasComputation protocol | Streaming computation for large DataFrames |
| Factory | ProfilerInterface.create(), ProfilerSourceFactory | Database-specific implementations via config |
| Strategy | Different _compute_*_metrics() methods dispatched by type | Each metric type has its own execution strategy |
Key Files Quick Reference
| What you want to do | Start here |
|---|
| Understand the workflow pipeline | workflow/profiler.py |
| See how tables are fetched and filtered | profiler/source/metadata.py |
| Read the core profiler orchestration | profiler/processor/core.py → Profiler class |
| See how metrics execute in threads | profiler/interface/sqlalchemy/profiler_interface.py → get_all_metrics() |
| Read metric base classes | profiler/metrics/core.py |
| Browse all built-in metrics | profiler/metrics/registry.py |
| See a static metric implementation | profiler/metrics/static/mean.py |
| See a composed metric | profiler/metrics/composed/null_ratio.py |
| Understand column type filtering | profiler/orm/registry.py → is_quantifiable(), etc. |
| See how results are published | ingestion/sink/metadata_rest.py → write_profiler_response() |
| Add database-specific overrides | profiler/interface/sqlalchemy/{dialect}/ |
All paths above are relative to ingestion/src/metadata/. For example, profiler/processor/core.py means ingestion/src/metadata/profiler/processor/core.py.