> ## Documentation Index
> Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Data Quality Module | Technical Architecture

> Technical deep dive into the OpenMetadata data quality module — test execution pipeline, validator architecture, and extension points.

# Data Quality Module

The data quality module lives at `ingestion/src/metadata/data_quality/`. It runs **test cases** against database tables (or DataFrames) and publishes results back to the OpenMetadata server.

This guide walks through the architecture top-down: from how a workflow is triggered, through the execution pipeline, down to individual validators.

## Directory Layout

```
data_quality/
├── api/
│   └── models.py                  # DTOs: TableAndTests, TestCaseResults, TestCaseResultResponse
├── builders/
│   └── validator_builder.py       # Factory that creates validator instances
├── interface/                     # Database abstraction layer
│   ├── test_suite_interface.py    # Abstract base
│   ├── sqlalchemy/                # SQL databases (+ Snowflake, Databricks overrides)
│   └── pandas/                    # Datalake / DataFrame-based
├── processor/
│   └── test_case_runner.py        # Workflow processor — orchestrates test execution
├── runner/
│   ├── core.py                    # DataTestsRunner — runs a single test via interface
│   └── base_test_suite_source.py  # Creates the interface + sampler for a table
├── source/
│   └── test_suite.py              # Fetches tables and test cases from OM API
└── validations/                   # All validation logic
    ├── base_test_handler.py       # Abstract BaseTestValidator (template method)
    ├── models.py                  # Runtime parameter models
    ├── utils.py                   # Helpers
    ├── impact_score.py            # Dimensional result scoring
    ├── checkers/                  # Condition checkers (between bounds, etc.)
    ├── mixins/
    │   ├── protocols.py           # HasValidatorContext protocol
    │   ├── sqa_validator_mixin.py # SQLAlchemy query helpers
    │   └── pandas_validator_mixin.py
    ├── runtime_param_setter/      # Resolve dynamic parameters at execution time
    ├── column/                    # Column-level validators
    │   ├── base/                  #   Abstract implementations
    │   ├── sqlalchemy/            #   SQL implementations
    │   └── pandas/                #   DataFrame implementations
    └── table/                     # Table-level validators (same structure)
```

## Workflow Pipeline

Data quality follows the standard **Source → Processor → Sink** pattern defined in `workflow/data_quality.py`:

```
┌──────────────────────────────────────────────────────────┐
│  TestSuiteSource                                         │
│  source/test_suite.py                                    │
│                                                          │
│  Fetches from OM API:                                    │
│  • Table entity                                          │
│  • Database service connection                           │
│  • Test cases linked to the test suite                   │
│                                                          │
│  Yields: TableAndTests                                   │
└──────────────────────┬───────────────────────────────────┘
                       │
                       ▼
┌──────────────────────────────────────────────────────────┐
│  TestCaseRunner                                          │
│  processor/test_case_runner.py                           │
│                                                          │
│  For each TableAndTests:                                 │
│  1. Filter to OM platform tests only                     │
│  2. Remove disabled and incompatible tests               │
│  3. Run each test case → collect results                 │
│                                                          │
│  Yields: TestCaseResults                                 │
└──────────────────────┬───────────────────────────────────┘
                       │
                       ▼
┌──────────────────────────────────────────────────────────┐
│  MetadataRestSink                                        │
│  ingestion/sink/metadata_rest.py                         │
│                                                          │
│  For each TestCaseResultResponse:                        │
│    POST /api/v1/dataQuality/testCases/{id}/testCaseResult│
└──────────────────────────────────────────────────────────┘
```

## Test Execution Call Chain

This is the most important sequence to understand. When `TestCaseRunner` processes a test case, here is exactly what happens:

```python theme={null}
# 1. PROCESSOR calls the runner
TestCaseRunner._run_test_case(test_case)
    └── DataTestsRunner.run_and_handle(test_case)        # runner/core.py

# 2. RUNNER delegates to the interface
        └── TestSuiteInterface.run_test_case(test_case)   # interface/test_suite_interface.py

# 3. INTERFACE resolves runtime params and builds the validator
            ├── RuntimeParameterSetterFactory.get_runtime_param_setters(test_def_fqn)
            │     # Adds dynamic params (e.g., table references for tableDiff)
            │
            ├── _get_validator_builder(test_case, entity_type)
            │     ├── Fetch TestDefinition from OM API
            │     ├── Read validatorClass field (e.g., "columnValuesToBeNotNull")
            │     └── return ValidatorBuilder(runner, test_case, test_def)
            │
            ├── validator_builder.set_runtime_params(param_setters)
            │
            └── validator = validator_builder.validator
                  # Dynamically imports the correct validator class

# 4. VALIDATOR executes the test
                  └── validator.run_validation()           # validations/base_test_handler.py
                        ├── _run_validation()              # Subclass: query + evaluate
                        ├── _run_dimensional_validation()  # Optional: GROUP BY analysis
                        └── return TestCaseResult
```

## Data Model

Three entities form the core model. All are defined as JSON Schemas in `openmetadata-spec/` and auto-generated into Python models:

```
TestDefinition
  ├── name: str                    # e.g., "columnValuesToBeNotNull"
  ├── entityType: COLUMN | TABLE
  ├── validatorClass: str          # Maps to Python class name
  ├── parameters: List[ParamDef]   # Parameter schema
  └── supportedDataTypes: List     # Column types this test applies to

TestCase
  ├── testDefinition: FK           # Points to a TestDefinition
  ├── entityLink: str              # Table or column being tested
  ├── parameterValues: List        # Actual parameter values
  ├── dimensionColumns: List       # Optional: columns for GROUP BY analysis
  └── topDimensions: int           # Max dimension groups to report

TestCaseResult
  ├── testCaseStatus: Success | Failed | Aborted
  ├── result: str                  # Human-readable message
  ├── testResultValue: List        # Metric name → value pairs
  ├── passedRows / failedRows
  └── dimensionResults: List       # Per-group results (optional)
```

## Validator Architecture

Validators use a **template method** pattern with **mixins** for database-specific logic.

### Class Hierarchy

```
BaseTestValidator                          # validations/base_test_handler.py
│   run_validation()                       #   Template method (calls hooks below)
│   _run_validation()                      #   Abstract — implement test logic
│   _evaluate_test_condition()             #   Abstract — pass/fail check
│   _format_result_message()               #   Abstract — human-readable output
│   _run_dimensional_validation()          #   Dimensional GROUP BY logic
│   _execute_dimensional_validation()      #   Abstract — build dimension query
│
├── Base implementations                   # validations/column/base/ or table/base/
│   e.g., BaseColumnValuesToBeNotNullValidator
│         Implements _run_validation, _evaluate_test_condition, _format_result_message
│         Leaves metric computation abstract
│
├── SQL implementations                    # validations/column/sqlalchemy/
│   e.g., ColumnValuesToBeNotNullValidator(Base + SQAValidatorMixin)
│         SQAValidatorMixin provides: get_column(), run_query_results()
│
└── Pandas implementations                 # validations/column/pandas/
    e.g., PandasColumnValuesToBeNotNullValidator(Base + PandasValidatorMixin)
          PandasValidatorMixin provides: get_column(), run_dataframe_results()
```

### How Validators Are Discovered

Validators are **dynamically imported** at runtime based on the `validatorClass` field from the `TestDefinition`:

```python theme={null}
# ValidatorBuilder calls import_test_case_class() which resolves:
#   entity_type = "column" or "table"
#   runner_type = "sqlalchemy" or "pandas"
#   validator_class = from TestDefinition.validatorClass

module_path = f"metadata.data_quality.validations.{entity_type}.{runner_type}.{validator_class}"
# Example: metadata.data_quality.validations.column.sqlalchemy.columnValuesToBeNotNull
```

This means adding a new test is purely additive — no registry to update, no imports to add. Just place the file in the right directory.

## Dimensional Analysis

When a test case has `dimensionColumns` configured, the validator runs the test **per group** using `GROUP BY`:

```
For dimensionColumns = ["region", "product"]:

1. Run GROUP BY region  → compute metrics per region value
2. Run GROUP BY product → compute metrics per product value
3. For each group:
   ├── Evaluate pass/fail
   ├── Calculate impact score
   └── Create DimensionResult
4. Keep top N groups by impact score, aggregate rest as "Others"
```

**Impact score** (`validations/impact_score.py`) ranks which groups matter most:

```
impact = (failure_rate² × volume_factor × sample_weight) / 1.5
```

* `failure_rate²` — emphasizes high failure percentages
* `volume_factor` — tiered by row count (0.25 for \<10 rows → 1.5 for ≥100k)
* `sample_weight` — credibility discount for small samples

## Runtime Parameter Setters

Some tests need parameters that can only be resolved at execution time. The `RuntimeParameterSetterFactory` (`validations/runtime_param_setter/param_setter_factory.py`) maps test definition names to setter classes:

| Test                        | Setter                                 | What It Does                                                        |
| --------------------------- | -------------------------------------- | ------------------------------------------------------------------- |
| `tableDiff`                 | `TableDiffParamsSetter`                | Fetches the comparison table, resolves key columns from constraints |
| `tableCustomSQLQuery`       | `TableCustomSQLQueryParamsSetter`      | Compiles SQL with runtime table references                          |
| `*RuleLibrarySqlExpression` | `RuleLibrarySqlExpressionParamsSetter` | Builds SQL from the rule library                                    |

## Adding a New Test

<Steps>
  <Step title="Create the base validator">
    Create `validations/column/base/myNewTest.py` (or `table/base/` for table-level tests).

    Extend `BaseTestValidator` and implement:

    * `_run_validation()` — core test logic
    * `_evaluate_test_condition()` — return pass/fail
    * `_format_result_message()` — human-readable result string
  </Step>

  <Step title="Create the SQLAlchemy implementation">
    Create `validations/column/sqlalchemy/myNewTest.py`.

    Inherit from your base class **and** `SQAValidatorMixin`. The mixin gives you `get_column()`, `run_query_results()`, and `_execute_dimensional_validation()`.
  </Step>

  <Step title="Create the Pandas implementation">
    Create `validations/column/pandas/myNewTest.py`.

    Inherit from your base class **and** `PandasValidatorMixin`.
  </Step>

  <Step title="Register the TestDefinition">
    Create a `TestDefinition` in OpenMetadata with:

    * `validatorClass` matching your file name (e.g., `"myNewTestValidator"`)
    * `entityType` set to `COLUMN` or `TABLE`
    * Parameter definitions for any inputs your test needs
  </Step>
</Steps>

<Info>
  No imports or registries need to be updated. The `ValidatorBuilder` discovers your validator automatically from the file path convention.
</Info>

## Key Design Patterns

| Pattern                 | Where                                               | Why                                                                     |
| ----------------------- | --------------------------------------------------- | ----------------------------------------------------------------------- |
| **Template Method**     | `BaseTestValidator.run_validation()`                | Shared flow (run → evaluate → format → dimensional) with subclass hooks |
| **Strategy via Mixins** | `SQAValidatorMixin`, `PandasValidatorMixin`         | Same test logic, different data access (SQL vs DataFrame)               |
| **Factory**             | `ValidatorBuilder`, `RuntimeParameterSetterFactory` | Decouple creation from usage                                            |
| **Dynamic Import**      | `import_test_case_class()`                          | Zero-config validator discovery                                         |
| **Protocol**            | `HasValidatorContext`                               | Structural typing for validator dependencies                            |

## Key Files Quick Reference

| What you want to do                       | Start here                                                           |
| ----------------------------------------- | -------------------------------------------------------------------- |
| Understand the workflow pipeline          | `workflow/data_quality.py`                                           |
| See how test cases are fetched            | `source/test_suite.py`                                               |
| See how tests are filtered and dispatched | `processor/test_case_runner.py`                                      |
| Understand validator creation             | `builders/validator_builder.py`                                      |
| Read the base validator logic             | `validations/base_test_handler.py`                                   |
| See a concrete SQL validator              | `validations/column/sqlalchemy/columnValuesToBeNotNull.py`           |
| See a concrete Pandas validator           | `validations/column/pandas/columnValuesToBeNotNull.py`               |
| Understand dimensional analysis           | `validations/base_test_handler.py` → `_run_dimensional_validation()` |
| Add runtime parameter resolution          | `validations/runtime_param_setter/param_setter_factory.py`           |
| See how results are published             | `ingestion/sink/metadata_rest.py` → `write_test_case_result_list()`  |

<Tip>
  All paths above are relative to `ingestion/src/metadata/`. For example, `source/test_suite.py` means `ingestion/src/metadata/data_quality/source/test_suite.py`.
</Tip>
