Skip to main content

Profiler Module

The profiler module lives at ingestion/src/metadata/profiler/. It computes statistical metrics on database tables (row counts, null ratios, histograms, etc.) and publishes the results back to the OpenMetadata server. This guide walks through the architecture top-down: from the workflow trigger, through the metrics system, down to how individual metrics are computed and published.

Directory Layout

profiler/
├── api/
│   └── models.py                    # DTOs: ProfilerResponse, ProfilerProcessorConfig
├── config.py                        # Config helpers for schema/database profiler settings
├── factory.py                       # Abstract factory for profiler interfaces
├── registry.py                      # MetricRegistry, TypeRegistry enums
├── metrics/                         # All metric definitions
│   ├── core.py                      # Base classes: Metric, StaticMetric, QueryMetric, etc.
│   ├── registry.py                  # Metrics enum — the central metric catalog
│   ├── static/                      # SQL aggregate metrics (COUNT, SUM, MIN, MAX, ...)
│   ├── composed/                    # Derived metrics (NullRatio, DuplicateCount, IQR, ...)
│   ├── window/                      # Percentile metrics (Median, Quartiles)
│   ├── hybrid/                      # Metrics combining queries + prior results (Histogram)
│   ├── system/                      # Database system metrics (DML ops, freshness)
│   └── pandas_metric_protocol.py    # Accumulator pattern for DataFrame metrics
├── interface/                       # Database abstraction layer
│   ├── profiler_interface.py        # Abstract base
│   └── sqlalchemy/                  # SQL implementations (+ dialect overrides)
├── processor/                       # Core profiling logic
│   ├── core.py                      # Profiler class — orchestrates metric computation
│   ├── processor.py                 # ProfilerProcessor — ingestion framework step
│   ├── runner.py                    # QueryRunner / PandasRunner
│   ├── metric_filter.py             # Selects metrics by config + column type
│   └── models.py                    # Internal processor models
├── source/                          # Profiler data sources
│   ├── metadata.py                  # OpenMetadataSource — fetches tables from OM API
│   ├── fetcher/                     # Entity fetching strategies
│   └── database/                    # Database-specific profiler sources
│       └── base/
│           ├── profiler_source.py   # Creates the profiler runner
│           └── profiler_resolver.py # Resolves sampler + interface by engine
├── orm/                             # ORM utilities
│   ├── registry.py                  # Type registries, dialect maps, type classifiers
│   ├── converter/                   # DB-specific type converters
│   └── functions/                   # Custom SQL functions (SumFn, LenFn, etc.)
└── adaptors/                        # Adaptors for non-SQL sources (NoSQL, etc.)

Workflow Pipeline

The profiler follows the standard Source → Processor → Sink pattern defined in workflow/profiler.py:
┌──────────────────────────────────────────────────────────┐
│  OpenMetadataSource                                      │
│  profiler/source/metadata.py                             │
│                                                          │
│  Fetches from OM API:                                    │
│  • Tables matching service/database/schema filters       │
│  • Global profiler configuration                         │
│  • Database service connection                           │
│                                                          │
│  Yields: ProfilerSourceAndEntity                         │
└──────────────────────┬───────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│  ProfilerProcessor                                       │
│  profiler/processor/processor.py                         │
│                                                          │
│  For each table:                                         │
│  1. Create profiler runner (Profiler instance)           │
│  2. Execute all metrics                                  │
│  3. Build CreateTableProfileRequest                      │
│                                                          │
│  Yields: ProfilerResponse                                │
└──────────────────────┬───────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│  MetadataRestSink                                        │
│  ingestion/sink/metadata_rest.py                         │
│                                                          │
│  POST /api/v1/tables/{id}/tableProfile                   │
└──────────────────────────────────────────────────────────┘

Metrics System

The metrics system is the heart of the profiler. Every metric is a class that knows how to compute itself against a data source.

Metric Type Hierarchy

Metric (ABC)                                # metrics/core.py

├── StaticMetric                            # SQL aggregate functions
│   fn(col, ...) → SQLAlchemy expression    # e.g., func.count(), func.avg()
│   Examples: Count, NullCount, Min, Max, Mean, Sum, StdDev,
│             DistinctCount, UniqueCount, MinLength, MaxLength

├── QueryMetric                             # Full query execution
│   query(col, ...) → SQLAlchemy Select     # Returns multiple rows
│   Examples: LikeCount, RegexCount

├── ComposedMetric                          # Derived from other metrics
│   fn(results) → value                     # Pure computation, no DB query
│   Examples: NullRatio, DistinctRatio, UniqueRatio,
│             DuplicateCount, InterQuartileRange

├── HybridMetric                            # Query + prior results
│   query(col, ...) → Select               # Needs results from earlier metrics
│   fn(results) → value
│   Examples: Histogram, CardinalityDistribution

├── WindowMetric                            # Percentile-based (window functions)
│   fn(col, ...) → SQLAlchemy expression
│   Examples: Median, FirstQuartile, ThirdQuartile

├── CustomMetric                            # User-defined SQL
│   sql(col, ...) → raw SQL string

└── SystemMetric                            # Database system-level
│   sql(col, ...) → dialect-specific query
│   Examples: DML operation counts, freshness

Metric Registry

All built-in metrics are registered in the Metrics enum (metrics/registry.py):
class Metrics(MetricRegistry):
    ROW_COUNT = RowCount           # Table-level
    COLUMN_COUNT = ColumnCount
    COUNT = Count                  # Column-level
    NULL_COUNT = NullCount
    DISTINCT_COUNT = DistinctCount
    MEAN = Mean
    # ... 40+ metrics
The MetricRegistry base class makes each enum member callableMetrics.ROW_COUNT() instantiates a RowCount metric.

Metric Computation Order

Metrics are computed in a strict dependency order:
1. Static Metrics    (SQL aggregates — independent)
2. Query Metrics     (full queries — independent)
3. Window Metrics    (percentiles — independent)
4. System Metrics    (database system queries — independent)
5. Custom Metrics    (user SQL — independent)

        ▼ results available
6. Composed Metrics  (derived from 1-5, e.g., NullRatio = NullCount / Count)

        ▼ composed results available
7. Hybrid Metrics    (need both query + prior results, e.g., Histogram)
Steps 1–5 run in parallel via a thread pool. Steps 6–7 run sequentially per column after the parallel phase completes.

Profiler Execution Call Chain

# 1. PROCESSOR receives a table to profile
ProfilerProcessor._run(ProfilerSourceAndEntity)
    └── profiler_source.get_profiler_runner(entity, config)
        # source/database/base/profiler_source.py
        # Resolves the correct ProfilerInterface and SamplerInterface
        # Returns a Profiler instance

# 2. PROFILER orchestrates computation
    └── Profiler.process()                    # processor/core.py
        ├── compute_metrics()
        │   ├── profile_entity()
        │   │   ├── _prepare_table_metrics()    → ThreadPoolMetrics[]
        │   │   ├── _prepare_column_metrics()   → ThreadPoolMetrics[]
        │   │   ├── _prepare_system_metrics()   → ThreadPoolMetrics[]
        │   │   └── interface.get_all_metrics(all_metrics)
        │   │
        │   │   # 3. INTERFACE executes in thread pool
        │   │       ├── ThreadPoolExecutor(max_workers=5..20)
        │   │       ├── For each ThreadPoolMetrics:
        │   │       │   └── compute_metrics_in_thread()
        │   │       │       ├── Create thread-local QueryRunner
        │   │       │       └── Dispatch by metric type:
        │   │       │           ├── _compute_table_metrics()
        │   │       │           ├── _compute_static_metrics()
        │   │       │           ├── _compute_query_metrics()
        │   │       │           ├── _compute_window_metrics()
        │   │       │           ├── _compute_custom_metrics()
        │   │       │           └── _compute_system_metrics()
        │   │       └── Aggregate results → {table: {}, columns: {}, system: []}
        │   │
        │   ├── For each column:
        │   │   ├── run_composed_metrics(col)  # Uses static results
        │   │   └── run_hybrid_metrics(col)    # Uses composed results
        │   └── return self

        └── get_profile()                       # Transform to API model
            └── return ProfilerResponse(table, CreateTableProfileRequest)

Threading Model

The profiler uses a thread pool to parallelize metric computation across columns:
┌─────────────────────────────────────────────────┐
│ ThreadPoolExecutor (5–20 threads)                │
│                                                  │
│  Thread 1: table metrics (RowCount, ColCount)    │
│  Thread 2: column "id" static metrics            │
│  Thread 3: column "name" static metrics          │
│  Thread 4: column "email" static metrics         │
│  Thread 5: system metrics                        │
│  ...                                             │
└─────────────────────────────────────────────────┘

          ▼  all threads complete
┌─────────────────────────────────────────────────┐
│ Sequential: composed → hybrid per column         │
└─────────────────────────────────────────────────┘
Each thread gets its own QueryRunner instance (_create_thread_safe_runner()) with a dedicated database session. Thread count scales dynamically based on the number of tasks (min 5, max 20).

Metric Filtering

Not all metrics apply to all columns. MetricFilter (processor/metric_filter.py) selects metrics based on:
  1. Column data type — numeric metrics skip string columns (uses orm/registry.py classifiers like is_quantifiable(), is_concatenable())
  2. Global profiler config — admin can enable/disable specific metrics
  3. Table-level config — per-table metric overrides
  4. Database service type — some metrics only work on specific databases
# orm/registry.py type classifiers
is_quantifiable(col_type)   # → numeric metrics (Mean, StdDev, Sum, ...)
is_concatenable(col_type)   # → string metrics (MinLength, MaxLength, ...)
is_date_time(col_type)      # → temporal metrics
NOT_COMPUTE                 # → set of types to skip entirely (ARRAY, JSON, ...)

ORM Layer

The orm/ directory bridges SQLAlchemy types with OpenMetadata’s type system:
  • registry.pyPythonDialects maps service types to SQLAlchemy dialects; CustomTypes handles special types (UUID, BYTES, ARRAY)
  • converter/ — database-specific converters (BigQuery, Snowflake, etc.) that map SQLAlchemy column types to OpenMetadata column types
  • functions/ — custom SQL functions used by metrics (e.g., SumFn, LenFn) that handle dialect differences

Database Backend Pluggability

Each database can override the default profiler behavior:
interface/sqlalchemy/
├── profiler_interface.py              # SQAProfilerInterface (base SQL implementation)
├── bigquery/profiler_interface.py     # BigQuery overrides
├── snowflake/profiler_interface.py    # Snowflake overrides
├── databricks/profiler_interface.py   # Databricks overrides
├── trino/profiler_interface.py        # Trino overrides
└── ...
System metrics are also dialect-specific:
metrics/system/
├── system.py                          # SystemMetricsRegistry (auto-discovers impls)
├── bigquery/system.py                 # BigQuery: INFORMATION_SCHEMA queries
├── snowflake/system.py                # Snowflake: ACCOUNT_USAGE queries
├── redshift/system.py                 # Redshift: STL_INSERT, STL_DELETE queries
└── databricks/system.py               # Databricks: table history queries

Profiler Configuration

The profiler is configured via ProfilerProcessorConfig (api/models.py):
ProfilerProcessorConfig:
    profiler: ProfilerDef              # Which metrics to run, timeout
    tableConfig: List[TableConfig]     # Per-table overrides (columns, metrics)
    schemaConfig: List[...]            # Schema-level filters
    databaseConfig: List[...]          # Database-level filters

ProfilerDef:
    name: str
    timeout_seconds: int               # Per-table timeout
    metrics: List[ValidMetric]         # Validated against Metrics registry

Adding a New Metric

1

Choose the metric type

Decide which base class fits your metric:
If your metric…Extend
Is a SQL aggregate (one value per column)StaticMetric
Needs a full query (multiple rows)QueryMetric
Is derived from other metrics (no DB query)ComposedMetric
Needs both a query and prior metric resultsHybridMetric
Uses window functions (percentiles)WindowMetric
2

Implement the metric class

Create a file in the appropriate metrics/ subdirectory.For a StaticMetric, implement fn() returning a SQLAlchemy expression:
from metadata.profiler.metrics.core import StaticMetric, _label

class MyMetric(StaticMetric):
    @classmethod
    def name(cls):
        return "myMetric"

    @classmethod
    def is_col_metric(cls):
        return True

    @_label
    def fn(self, col, ...):
        return func.my_aggregate(col)
For a ComposedMetric, implement fn() using prior results:
class MyRatio(ComposedMetric):
    @classmethod
    def required_metrics(cls):
        return [MyMetric, Count]  # Must run before this metric

    @classmethod
    def fn(cls, res):
        my_val = res.get(MyMetric.name())
        count = res.get(Count.name())
        return my_val / count if count else None
3

Register in the Metrics enum

Add your metric to the Metrics enum in metrics/registry.py:
class Metrics(MetricRegistry):
    # ... existing metrics
    MY_METRIC = MyMetric
4

Add type filtering (if needed)

If your metric only applies to certain column types, update MetricFilter in processor/metric_filter.py to filter appropriately.

Pandas / DataFrame Support

For non-SQL sources (datalakes, files), the profiler uses a Pandas-based interface with an accumulator pattern (metrics/pandas_metric_protocol.py):
class PandasComputation(Protocol[T, R]):
    def create_accumulator(self) -> T           # Init empty container
    def update_accumulator(self, acc: T, df: DataFrame) -> T  # Process chunk
    def aggregate_accumulator(self, acc: T) -> R              # Final result
This three-step pattern enables streaming computation over large datasets that don’t fit in memory — each DataFrame chunk updates the accumulator, and the final result is computed after all chunks are processed.

Key Design Patterns

PatternWhereWhy
RegistryMetrics enum, SystemMetricsRegistry, TypeRegistryCentral catalog with callable instantiation
Thread PoolSQAProfilerInterface.get_all_metrics()Parallelize metric computation across columns
Decorator@add_props(table=table)Dynamically attach table/column context to metric instances
AccumulatorPandasComputation protocolStreaming computation for large DataFrames
FactoryProfilerInterface.create(), ProfilerSourceFactoryDatabase-specific implementations via config
StrategyDifferent _compute_*_metrics() methods dispatched by typeEach metric type has its own execution strategy

Key Files Quick Reference

What you want to doStart here
Understand the workflow pipelineworkflow/profiler.py
See how tables are fetched and filteredprofiler/source/metadata.py
Read the core profiler orchestrationprofiler/processor/core.pyProfiler class
See how metrics execute in threadsprofiler/interface/sqlalchemy/profiler_interface.pyget_all_metrics()
Read metric base classesprofiler/metrics/core.py
Browse all built-in metricsprofiler/metrics/registry.py
See a static metric implementationprofiler/metrics/static/mean.py
See a composed metricprofiler/metrics/composed/null_ratio.py
Understand column type filteringprofiler/orm/registry.pyis_quantifiable(), etc.
See how results are publishedingestion/sink/metadata_rest.pywrite_profiler_response()
Add database-specific overridesprofiler/interface/sqlalchemy/{dialect}/
All paths above are relative to ingestion/src/metadata/. For example, profiler/processor/core.py means ingestion/src/metadata/profiler/processor/core.py.