Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt

Use this file to discover all available pages before exploring further.

Profiler Module: Execution & Extension

This page covers the execution internals and extension points. For the architecture overview and metrics system, see Profiler Module | Architecture & Metrics System.

Profiler Execution Call Chain

# 1. PROCESSOR receives a table to profile
ProfilerProcessor._run(ProfilerSourceAndEntity)
    └── profiler_source.get_profiler_runner(entity, config)
        # source/database/base/profiler_source.py
        # Resolves the correct ProfilerInterface and SamplerInterface
        # Returns a Profiler instance

# 2. PROFILER orchestrates computation
    └── Profiler.process()                    # processor/core.py
        ├── compute_metrics()
        │   ├── profile_entity()
        │   │   ├── _prepare_table_metrics()    → ThreadPoolMetrics[]
        │   │   ├── _prepare_column_metrics()   → ThreadPoolMetrics[]
        │   │   ├── _prepare_system_metrics()   → ThreadPoolMetrics[]
        │   │   └── interface.get_all_metrics(all_metrics)
        │   │
        │   │   # 3. INTERFACE executes in thread pool
        │   │       ├── ThreadPoolExecutor(max_workers=5..20)
        │   │       ├── For each ThreadPoolMetrics:
        │   │       │   └── compute_metrics_in_thread()
        │   │       │       ├── Create thread-local QueryRunner
        │   │       │       └── Dispatch by metric type:
        │   │       │           ├── _compute_table_metrics()
        │   │       │           ├── _compute_static_metrics()
        │   │       │           ├── _compute_query_metrics()
        │   │       │           ├── _compute_window_metrics()
        │   │       │           ├── _compute_custom_metrics()
        │   │       │           └── _compute_system_metrics()
        │   │       └── Aggregate results → {table: {}, columns: {}, system: []}
        │   │
        │   ├── For each column:
        │   │   ├── run_composed_metrics(col)  # Uses static results
        │   │   └── run_hybrid_metrics(col)    # Uses composed results
        │   └── return self

        └── get_profile()                       # Transform to API model
            └── return ProfilerResponse(table, CreateTableProfileRequest)

Threading Model

The profiler uses a thread pool to parallelize metric computation across columns:
┌─────────────────────────────────────────────────┐
│ ThreadPoolExecutor (5–20 threads)                │
│                                                  │
│  Thread 1: table metrics (RowCount, ColCount)    │
│  Thread 2: column "id" static metrics            │
│  Thread 3: column "name" static metrics          │
│  Thread 4: column "email" static metrics         │
│  Thread 5: system metrics                        │
│  ...                                             │
└─────────────────────────────────────────────────┘

          ▼  all threads complete
┌─────────────────────────────────────────────────┐
│ Sequential: composed → hybrid per column         │
└─────────────────────────────────────────────────┘
Each thread gets its own QueryRunner instance (_create_thread_safe_runner()) with a dedicated database session. Thread count scales dynamically based on the number of tasks (min 5, max 20).

Metric Filtering

Not all metrics apply to all columns. MetricFilter (processor/metric_filter.py) selects metrics based on:
  1. Column data type — numeric metrics skip string columns (uses orm/registry.py classifiers like is_quantifiable(), is_concatenable())
  2. Global profiler config — admin can enable/disable specific metrics
  3. Table-level config — per-table metric overrides
  4. Database service type — some metrics only work on specific databases
# orm/registry.py type classifiers
is_quantifiable(col_type)   # → numeric metrics (Mean, StdDev, Sum, ...)
is_concatenable(col_type)   # → string metrics (MinLength, MaxLength, ...)
is_date_time(col_type)      # → temporal metrics
NOT_COMPUTE                 # → set of types to skip entirely (ARRAY, JSON, ...)

ORM Layer

The orm/ directory bridges SQLAlchemy types with OpenMetadata’s type system:
  • registry.pyPythonDialects maps service types to SQLAlchemy dialects; CustomTypes handles special types (UUID, BYTES, ARRAY)
  • converter/ — database-specific converters (BigQuery, Snowflake, etc.) that map SQLAlchemy column types to OpenMetadata column types
  • functions/ — custom SQL functions used by metrics (e.g., SumFn, LenFn) that handle dialect differences

Database Backend Pluggability

Each database can override the default profiler behavior:
interface/sqlalchemy/
├── profiler_interface.py              # SQAProfilerInterface (base SQL implementation)
├── bigquery/profiler_interface.py     # BigQuery overrides
├── snowflake/profiler_interface.py    # Snowflake overrides
├── databricks/profiler_interface.py   # Databricks overrides
├── trino/profiler_interface.py        # Trino overrides
└── ...
System metrics are also dialect-specific:
metrics/system/
├── system.py                          # SystemMetricsRegistry (auto-discovers impls)
├── bigquery/system.py                 # BigQuery: INFORMATION_SCHEMA queries
├── snowflake/system.py                # Snowflake: ACCOUNT_USAGE queries
├── redshift/system.py                 # Redshift: STL_INSERT, STL_DELETE queries
└── databricks/system.py               # Databricks: table history queries

Profiler Configuration

The profiler is configured via ProfilerProcessorConfig (api/models.py):
ProfilerProcessorConfig:
    profiler: ProfilerDef              # Which metrics to run, timeout
    tableConfig: List[TableConfig]     # Per-table overrides (columns, metrics)
    schemaConfig: List[...]            # Schema-level filters
    databaseConfig: List[...]          # Database-level filters

ProfilerDef:
    name: str
    timeout_seconds: int               # Per-table timeout
    metrics: List[ValidMetric]         # Validated against Metrics registry

Adding a New Metric

1

Choose the metric type

Decide which base class fits your metric:
If your metric…Extend
Is a SQL aggregate (one value per column)StaticMetric
Needs a full query (multiple rows)QueryMetric
Is derived from other metrics (no DB query)ComposedMetric
Needs both a query and prior metric resultsHybridMetric
Uses window functions (percentiles)WindowMetric
2

Implement the metric class

Create a file in the appropriate metrics/ subdirectory.For a StaticMetric, implement fn() returning a SQLAlchemy expression:
from metadata.profiler.metrics.core import StaticMetric, _label

class MyMetric(StaticMetric):
    @classmethod
    def name(cls):
        return "myMetric"

    @classmethod
    def is_col_metric(cls):
        return True

    @_label
    def fn(self, col, ...):
        return func.my_aggregate(col)
For a ComposedMetric, implement fn() using prior results:
class MyRatio(ComposedMetric):
    @classmethod
    def required_metrics(cls):
        return [MyMetric, Count]  # Must run before this metric

    @classmethod
    def fn(cls, res):
        my_val = res.get(MyMetric.name())
        count = res.get(Count.name())
        return my_val / count if count else None
3

Register in the Metrics enum

Add your metric to the Metrics enum in metrics/registry.py:
class Metrics(MetricRegistry):
    # ... existing metrics
    MY_METRIC = MyMetric
4

Add type filtering (if needed)

If your metric only applies to certain column types, update MetricFilter in processor/metric_filter.py to filter appropriately.

Pandas / DataFrame Support

For non-SQL sources (datalakes, files), the profiler uses a Pandas-based interface with an accumulator pattern (metrics/pandas_metric_protocol.py):
class PandasComputation(Protocol[T, R]):
    def create_accumulator(self) -> T           # Init empty container
    def update_accumulator(self, acc: T, df: DataFrame) -> T  # Process chunk
    def aggregate_accumulator(self, acc: T) -> R              # Final result
This three-step pattern enables streaming computation over large datasets that don’t fit in memory — each DataFrame chunk updates the accumulator, and the final result is computed after all chunks are processed.

Key Design Patterns

PatternWhereWhy
RegistryMetrics enum, SystemMetricsRegistry, TypeRegistryCentral catalog with callable instantiation
Thread PoolSQAProfilerInterface.get_all_metrics()Parallelize metric computation across columns
Decorator@add_props(table=table)Dynamically attach table/column context to metric instances
AccumulatorPandasComputation protocolStreaming computation for large DataFrames
FactoryProfilerInterface.create(), ProfilerSourceFactoryDatabase-specific implementations via config
StrategyDifferent _compute_*_metrics() methods dispatched by typeEach metric type has its own execution strategy

Key Files Quick Reference

What you want to doStart here
Understand the workflow pipelineworkflow/profiler.py
See how tables are fetched and filteredprofiler/source/metadata.py
Read the core profiler orchestrationprofiler/processor/core.pyProfiler class
See how metrics execute in threadsprofiler/interface/sqlalchemy/profiler_interface.pyget_all_metrics()
Read metric base classesprofiler/metrics/core.py
Browse all built-in metricsprofiler/metrics/registry.py
See a static metric implementationprofiler/metrics/static/mean.py
See a composed metricprofiler/metrics/composed/null_ratio.py
Understand column type filteringprofiler/orm/registry.pyis_quantifiable(), etc.
See how results are publishedingestion/sink/metadata_rest.pywrite_profiler_response()
Add database-specific overridesprofiler/interface/sqlalchemy/{dialect}/
All paths above are relative to ingestion/src/metadata/. For example, profiler/processor/core.py means ingestion/src/metadata/profiler/processor/core.py.