Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt

Use this file to discover all available pages before exploring further.

Chunk-Based Validation

For large datasets that don’t fit in memory, validate data in chunks:

Method 1: Manual Chunk Validation

import pandas as pd
from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator, ValidationResult

validator = DataFrameValidator()
validator.add_openmetadata_table_tests("Postgres.warehouse.staging.transactions")

results = []

# Process file in chunks
for chunk in pd.read_csv("large_file.csv", chunksize=10000):
    # Validate chunk
    result = validator.validate(chunk)
    results.append(result)

    # Load if valid, otherwise stop
    if result.success:
        load_chunk_to_database(chunk)
    else:
        print(f"Validation failed on chunk")
        rollback_all_chunks()
        break

# Merge all results
final_result = ValidationResult.merge(*results)

# Publish aggregated results
final_result.publish("Postgres.warehouse.staging.transactions")

Method 2: Using the run() Method

The run() method provides a cleaner approach with automatic chunk handling:
from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator
import pandas as pd
from sqlalchemy import create_engine, Table, MetaData, insert, delete

# Configure validator
validator = DataFrameValidator()
validator.add_openmetadata_table_tests("Postgres.warehouse.staging.orders")

# Setup database connection
engine = create_engine("postgresql://user:pass@localhost/warehouse")

# Define success and failure handlers
def load_chunk(df, validation_result):
    """Load validated chunk to database"""
    with engine.connect() as conn:
        table = Table("orders", MetaData(), autoload_with=conn)
        conn.execute(insert(table).values(), df.to_dict(orient="records"))

def handle_failure(df, validation_result):
    """Rollback on validation failure"""
    with engine.connect() as conn:
        table = Table("orders", MetaData(), autoload_with=conn)
        conn.execute(delete(table))
    print(f"Validation failed: {validation_result}")

# Run validation on chunks
result = validator.run(
    pd.read_csv("orders.csv", chunksize=5000),
    on_success=load_chunk,
    on_failure=handle_failure
)

# Publish results
if result.success:
    print("✓ All chunks validated and loaded successfully")
else:
    print("✗ Validation failed - transaction rolled back")

result.publish("Postgres.warehouse.staging.orders")

Transaction-Safe Chunk Processing

Use a context manager to ensure atomic transactions:
from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, insert

class DatabaseSession:
    def __init__(self, connection_string, table_name):
        self.engine = create_engine(connection_string)
        self.table_name = table_name
        self._conn = None

    def __enter__(self):
        self._conn = self.engine.connect()
        self._trans = self._conn.begin()
        self.table = Table(self.table_name, MetaData(), autoload_with=self._conn)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            self._trans.rollback()
        else:
            self._trans.commit()
        self._conn.close()

    def load_chunk(self, df, validation_result):
        """Load chunk to database"""
        self._conn.execute(
            insert(self.table).values(),
            df.to_dict(orient="records")
        )

    def rollback(self, df, validation_result):
        """Rollback transaction"""
        self._trans.rollback()

# Use in validation workflow
validator = DataFrameValidator()
validator.add_openmetadata_table_tests("Postgres.warehouse.staging.sales")

with DatabaseSession("postgresql://user:pass@localhost/warehouse", "sales") as session:
    result = validator.run(
        pd.read_csv("sales_data.csv", chunksize=10000),
        on_success=session.load_chunk,
        on_failure=session.rollback
    )

print(f"Validation {'succeeded' if result.success else 'failed'}")

Failure Modes

As of version 1.11.0.0 of the SDK, DataFrameValidator supports only one failure mode: short circuit.
from metadata.sdk.data_quality.dataframes import FailureMode

# SHORT_CIRCUIT (default): Stop on first test failure
result = validator.validate(df, mode=FailureMode.SHORT_CIRCUIT)

# In chunk-based processing, SHORT_CIRCUIT stops processing additional chunks
result = validator.run(
    data_chunks,
    on_success=load_chunk,
    on_failure=rollback,
    mode=FailureMode.SHORT_CIRCUIT  # Stop on first failed chunk
)
Future versions will include additional modes to report back failing rows or skipping failing batches.

Working with Validation Results

Accessing Test Results

result = validator.validate(df)

# Check overall success
if result.success:
    print("All tests passed")

# Iterate through individual test results
for test_case, test_result in result.test_cases_and_results:
    print(f"Test: {test_case.name.root}")
    print(f"Status: {test_result.testCaseStatus}")
    print(f"Details: {test_result.result}")

Merging Results from Multiple Chunks

from metadata.sdk.data_quality.dataframes import ValidationResult

results = []

for chunk in data_chunks:
    result = validator.validate(chunk)
    results.append(result)

# Merge all chunk results
merged_result = ValidationResult.merge(*results)

# Merged result contains aggregated information
print(f"Overall success: {merged_result.success}")

Publishing Results to OpenMetadata

After validation, publish results back to OpenMetadata for tracking and alerting:
result = validator.validate(df)

# Publish results to OpenMetadata
result.publish("Postgres.warehouse.staging.customer_data")
This enables:
  • Historical tracking of data quality trends
  • Alerting on validation failures
  • Visualization in OpenMetadata UI
  • Centralized data quality reporting

Important Considerations for Chunk-Based Validation

When using chunk-based validation, be aware of tests that require the full dataset:

Tests That Require Full Table

Some tests analyze the entire dataset and may produce incorrect results when run on chunks:
  • TableRowCountToBeBetween: Counts rows in each chunk, not the full dataset
  • TableRowCountToEqual: Validates chunk size, not full dataset size
  • ColumnValuesSumToBeBetween: Sums values per chunk, not across all data
The SDK will issue a warning when such tests are detected:
WholeTableTestsWarning: Running tests that require the whole table on chunks
could lead to false positives. For example, a DataFrame with 200 rows split
in chunks of 50 could pass tests expecting DataFrames to contain max 100 rows.

The following tests could have unexpected results:
  - tableRowCountToBeBetween
  - columnValuesSumToBeBetween
For datasets that don’t fit in memory and require full-table tests:
  1. Use TestRunner to validate after loading
  2. Focus DataFrame validation on column-level tests that do not require aggregation
  3. Split validation into two phases:
    • During ETL: Validate column-level quality with DataFrameValidator
    • After loading: Validate table-level metrics with TestRunner
Example two-phase approach:
# Phase 1: Validate chunks during ETL (column-level only)
chunk_validator = DataFrameValidator()
chunk_validator.add_tests(
    ColumnValuesToBeNotNull(column="id"),
    ColumnValuesToBeUnique(column="id"),
    ColumnValuesToBeBetween(column="amount", min_value=0)
)

result = chunk_validator.run(
    pd.read_csv("data.csv", chunksize=10000),
    on_success=load_chunk,
    on_failure=rollback
)

# Phase 2: Validate table metrics after loading (table-level)
if result.success:
    from metadata.sdk.data_quality import TestRunner

    table_validator = TestRunner.for_table("Postgres.warehouse.staging.transactions")
    table_validator.add_tests(
        TableRowCountToBeBetween(min_count=10000),
        ColumnValuesSumToBeBetween(column="amount", min_value=1000000)
    )

    # Automatically publishes results for table tests only
    table_results = table_validator.run()

Best Practices

  1. Validate before loading: Catch issues before contaminating your warehouse
    result = validator.validate(transformed_df)
    if result.success:
        load_to_warehouse(df)
    
  2. Use transactional chunk processing: Ensure atomic all-or-nothing behavior
    with database_transaction():
        result = validator.run(chunks, on_success=load, on_failure=rollback)
    
  3. Leverage OpenMetadata tests: Let data stewards define quality criteria
    validator.add_openmetadata_table_tests("Table.FQN")
    
  4. Publish results: Enable tracking and alerting
    result.publish("Table.FQN")
    
  5. Handle failures gracefully: Don’t silently fail
    if not result.success:
        alert_team()
        rollback_transaction()
        raise DataQualityError(result)
    
  6. Use appropriate tests for chunks: Avoid full-table tests when processing chunks

Error Handling

Handle validation errors appropriately:
from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator

try:
    validator = DataFrameValidator()
    validator.add_openmetadata_table_tests("Postgres.warehouse.staging.orders")

    result = validator.validate(df)

    if not result.success:
        # Handle validation failure
        log_validation_failures(result)
        raise ValueError("Data quality checks failed")

except ValueError as e:
    print(f"Configuration error: {e}")
    # Table not found in OpenMetadata

except Exception as e:
    print(f"Validation error: {e}")
    # Other errors during validation

Next Steps