Skip to main content

Sampler Module

The sampler module lives at ingestion/src/metadata/sampler/. It provides a unified interface for sampling data from tables across different database backends. Both the profiler and data quality modules depend on the sampler to get a representative subset of rows for computing metrics and running tests.

Directory Layout

sampler/
├── sampler_interface.py               # Abstract base class for all samplers
├── models.py                          # SampleConfig, TableConfig, SamplerResponse, etc.
├── config.py                          # Hierarchical config resolution helpers
├── partition.py                       # Partition detection and filtering utilities
├── processor.py                       # SamplerProcessor (PII / auto-classification entry point)
├── sqlalchemy/                        # SQL database samplers
│   ├── sampler.py                     # Base SQASampler
│   ├── postgres/sampler.py            # PostgreSQL (BERNOULLI / SYSTEM)
│   ├── bigquery/sampler.py            # BigQuery (struct columns, views)
│   ├── snowflake/sampler.py           # Snowflake (BERNOULLI / SYSTEM / ROW)
│   ├── databricks/sampler.py          # Databricks (catalog, array slicing)
│   ├── mssql/sampler.py               # SQL Server (TABLESAMPLE)
│   ├── azuresql/sampler.py            # Azure SQL (TABLESAMPLE + type filtering)
│   ├── trino/sampler.py               # Trino (NaN filtering, JSON handling)
│   ├── timescale/sampler.py           # TimescaleDB (compressed chunk awareness)
│   └── unitycatalog/sampler.py        # Unity Catalog (extends Databricks)
├── pandas/
│   └── sampler.py                     # Datalake sampler (S3, GCS, local files)
└── nosql/
    └── sampler.py                     # NoSQL sampler (MongoDB, DynamoDB, etc.)

How It Fits Together

The sampler sits between the data source and the profiler/data quality modules:
┌─────────────────────────────────────────────────────────────────┐
│  Profiler / Data Quality                                        │
│                                                                 │
│  profiler_source.create_profiler_interface(sampler=...)          │
│  test_suite_interface.__init__(sampler=...)                      │
│                                                                 │
│  Uses:                                                          │
│  • sampler.get_dataset()    → sampled CTE / DataFrame for       │
│                               metric computation or test runs   │
│  • sampler.fetch_sample_data() → actual row data for preview    │
└──────────────────────┬──────────────────────────────────────────┘
                       │ calls

┌─────────────────────────────────────────────────────────────────┐
│  SamplerInterface                                               │
│                                                                 │
│  Resolves: sample config + partition config + column filters    │
│  Builds: sampling query (CTE with random selection or           │
│           TABLESAMPLE clause)                                   │
│  Returns: sampled dataset                                       │
└──────────────────────┬──────────────────────────────────────────┘
                       │ queries

┌─────────────────────────────────────────────────────────────────┐
│  Database / Data Source                                          │
└─────────────────────────────────────────────────────────────────┘

Core Abstraction

SamplerInterface (sampler_interface.py) is the abstract base class all samplers extend.

Key Methods

MethodPurpose
create(...)Factory method — creates a sampler with all dependencies
get_dataset()Returns the sampled dataset (CTE for SQL, DataFrame for pandas)
fetch_sample_data(columns)Fetches actual row data as TableData
generate_sample_data()Full pipeline: fetch → truncate → optionally upload
get_columns()Returns columns (respecting include/exclude filters)
raw_datasetAbstract property — the unsampled table/DataFrame
close()Cleanup connections

Constructor Parameters

SamplerInterface(
    service_connection_config,    # Database connection config
    ometa_client,                 # OpenMetadata API client
    entity,                       # Table entity being sampled
    sample_config,                # SampleConfig (percentage, rows, method)
    partition_details,            # PartitionProfilerConfig (optional)
    sample_query,                 # Custom SQL query (optional)
    include_columns,              # Columns to include (optional)
    exclude_columns,              # Columns to exclude (optional)
    sample_data_count,            # Number of rows to fetch (default: 100)
    storage_config,               # External storage for sample data (optional)
)

Data Models

Defined in models.py:
SampleConfig
  ├── profileSample: float         # Percentage (e.g., 50.0) or row count
  ├── profileSampleType: PERCENTAGE | ROWS
  ├── samplingMethodType: BERNOULLI | SYSTEM   # Database-specific method
  └── randomizedSample: bool       # Whether to randomize row selection

TableConfig
  ├── fullyQualifiedName: str
  ├── profileSample / profileSampleType / samplingMethodType
  ├── profileQuery: str            # Custom SQL for sampling
  ├── partitionConfig: ...         # Partition filtering
  └── columnConfig: ColumnConfig   # Include/exclude columns

SamplerResponse                    # Returned by SamplerProcessor
  ├── entity: Table
  ├── sample_data: SampleData
  └── column_tags: List            # For PII / auto-classification

Sampling Strategies

Percentage-Based Sampling

The default strategy. Takes X% of rows from the table. With randomization (default):
-- Creates a CTE with a random label, then filters
WITH sampled AS (
    SELECT *, RANDOM() AS random_label FROM table
)
SELECT * FROM sampled WHERE random_label <= 0.50  -- 50%
With TABLESAMPLE (database-specific):
-- PostgreSQL BERNOULLI
SELECT * FROM table TABLESAMPLE BERNOULLI(50)

-- BigQuery
SELECT * FROM table TABLESAMPLE SYSTEM (50 PERCENT)

-- SQL Server
SELECT * FROM table TABLESAMPLE (50 PERCENT)

Row-Count-Based Sampling

Takes exactly N rows. With randomization:
SELECT * FROM table ORDER BY RANDOM() LIMIT 1000
Without randomization:
SELECT * FROM table LIMIT 1000
Snowflake row-based:
SELECT * FROM table TABLESAMPLE ROW (1000 ROWS)

Database-Specific Overrides

Each database sampler can override set_tablesample() to use the database’s native sampling:
DatabaseSampling MethodNotes
PostgreSQLBERNOULLI or SYSTEMConfigurable via samplingMethodType
BigQuerySYSTEM (X PERCENT)No TABLESAMPLE for views
SnowflakeBERNOULLI, SYSTEM, or ROW(N ROWS)Row-based sampling supported
SQL Server / Azure SQLX PERCENT or X ROWSNo TABLESAMPLE for views
DatabricksCTE-based (no native TABLESAMPLE)Array column slicing to prevent OOM
TrinoCTE-basedNaN filtering for float columns
TimescaleDBPostgreSQL BERNOULLI/SYSTEMRestricts to uncompressed chunks only

Partition Handling

partition.py detects and configures partition filtering so the sampler only reads relevant partitions:
get_partition_details(entity)
  ├── BigQuery time-unit partitions → filter by partition column
  ├── BigQuery ingestion-time partitions → filter by _PARTITIONDATE
  ├── BigQuery integer-range partitions → filter by range
  ├── Athena injected partitions → validate against profiler config
  └── Other databases → use partition config from table/profiler config
When partitions are configured, the sampler creates a CTE filtered by the partition predicate before applying sampling:
WITH partitioned AS (
    SELECT * FROM table WHERE partition_col >= '2024-01-01'
)
-- Then sample from the partitioned CTE

Configuration Resolution

config.py provides hierarchical config lookup (table → schema → database → default):
# Resolution order (most specific wins):
get_profile_sample_config(entity, schema_config, database_config, default_config)
  1. Check tableConfig for matching FQN
  2. Check schemaConfig for matching schema
  3. Check databaseConfig for matching database
  4. Fall back to default sample config
This means an admin can set a global “sample 50% of all tables” default, override it to “sample 10% for schema X”, and further override to “sample 1000 rows for table Y”.

Notable Database-Specific Behavior

TimescaleDB — Compressed Chunk Awareness

TimescaleDB compresses old data into compressed chunks. Decompressing during profiling would be extremely expensive. The TimescaleDB sampler:
  1. Queries TimescaleDB metadata to find the boundary between compressed and uncompressed chunks
  2. Adds a filter WHERE time_col >= uncompressed_boundary to the sample query
  3. Only samples from uncompressed (recent) data

Databricks — Array Column Slicing

Large array columns can cause OOM errors. The Databricks sampler:
  1. Detects CustomArray type columns
  2. Replaces them with slice(col, 1, N) in the SELECT to limit array elements
  3. Converts numpy arrays back to Python lists in results

BigQuery — Struct Column Handling

BigQuery struct columns (nested fields like address.city) require special handling:
  1. Detects struct columns via _handle_struct_columns()
  2. Builds queries that properly reference nested fields
  3. Handles project ID correction from entity database name

Trino — NaN Filtering

Trino float columns can contain NaN values that break downstream processing:
  1. Identifies float columns via FLOAT_SET registry
  2. Wraps float columns in CASE WHEN IS_NAN(col) THEN NULL ELSE col END

Pandas / Datalake Sampler

For non-SQL sources, DatalakeSampler works with DataFrames:
# Percentage-based
df.sample(frac=profile_sample / 100)

# Row-count-based
df.sample(n=profile_sample)
Supports:
  • Partitioned DataFrames via get_partitioned_df()
  • Custom queries via get_sampled_query_dataframe()
  • Chunked processing via DataFrame iterators
  • NaN value filtering

NoSQL Sampler

For NoSQL databases (MongoDB, DynamoDB), NoSQLSampler:
  • Uses NoSQLAdaptor to abstract database-specific operations
  • Converts percentage to row count: num_rows * (profileSample / 100)
  • Calls client.scan(limit=N) for sampling
  • Transposes list-of-dicts format into columnar TableData

Adding a Database-Specific Sampler

1

Create the sampler file

Create sampler/sqlalchemy/{dialect}/sampler.py.Extend SQASampler and override set_tablesample():
from metadata.sampler.sqlalchemy.sampler import SQASampler

class MyDBSampler(SQASampler):
    def set_tablesample(self, selectable):
        # Add your database's TABLESAMPLE clause
        return selectable.tablesample(
            func.bernoulli(self.sample_config.profileSample)
        )
2

Register the sampler

The sampler is discovered via import_sampler_class() which resolves the class dynamically based on the database service type. Ensure your module path follows the convention:
metadata/sampler/sqlalchemy/{dialect}/sampler.py
3

Handle edge cases

Override additional methods if your database needs special handling:
  • _base_sample_query() — customize the core sampling query
  • fetch_sample_data() — customize how rows are fetched
  • _handle_array_column() — if your database has array types that need slicing

Key Design Patterns

PatternWhereWhy
FactorySamplerInterface.create()Unified creation with config resolution
Strategyset_tablesample() overrides per databaseEach database has its own sampling syntax
Template Methodgenerate_sample_data()Shared flow (fetch → truncate → upload) with subclass hooks
Hierarchical Configconfig.py helpersTable → schema → database → default resolution
CTE-based SamplingSQASampler.get_dataset()Clean separation of partition filtering, sampling, and query

Key Files Quick Reference

What you want to doStart here
Understand the base interfacesampler_interface.py
See sampling config modelsmodels.py
Understand config resolutionconfig.py
See partition handlingpartition.py
Read the base SQL samplersqlalchemy/sampler.py
See a database-specific samplersqlalchemy/postgres/sampler.py (simplest)
See complex database handlingsqlalchemy/bigquery/sampler.py or sqlalchemy/timescale/sampler.py
Understand DataFrame samplingpandas/sampler.py
See NoSQL samplingnosql/sampler.py
See how profiler uses the samplerprofiler/source/database/base/profiler_source.py
See how DQ uses the samplerdata_quality/runner/base_test_suite_source.py
All paths above are relative to ingestion/src/metadata/. For example, sqlalchemy/sampler.py means ingestion/src/metadata/sampler/sqlalchemy/sampler.py.