Documentation Index
Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
Use this file to discover all available pages before exploring further.
Advanced Usage
This guide covers advanced patterns and configurations for Data Quality as Code, including loading tests from YAML files, customizing workflow configurations, and integrating with production systems.
Loading Tests from YAML
You can load test definitions from YAML workflow files, enabling version-controlled test configurations:
Basic YAML Loading
from metadata.sdk.data_quality import TestRunner
# Load from YAML file
runner = TestRunner.from_yaml(file_path="tests/customer_quality.yaml")
# Or from YAML string
yaml_config = """
source:
type: TestSuite
serviceName: local_postgres
sourceConfig:
config:
type: TestSuite
entityFullyQualifiedName: Postgres.warehouse.public.customers
processor:
type: orm-test-runner
config:
testCases:
- name: customer_email_not_null
testDefinitionName: columnValuesToBeNotNull
columnName: email
- name: customer_id_unique
testDefinitionName: columnValuesToBeUnique
columnName: customer_id
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: your-token-here
"""
runner = TestRunner.from_yaml(yaml_string=yaml_config)
# Run the loaded tests
results = runner.run()
By default, from_yaml() uses the connection configured via configure(). To use the connection from the YAML file:
runner = TestRunner.from_yaml(
file_path="tests/config.yaml",
use_connection_from_yaml=True
)
YAML File Structure
A complete YAML configuration includes:
source:
type: TestSuite
serviceName: postgres_production
sourceConfig:
config:
type: TestSuite
entityFullyQualifiedName: Postgres.analytics.public.user_events
processor:
type: orm-test-runner
config:
forceUpdate: false
testCases:
# Table-level tests
- name: table_row_count_validation
testDefinitionName: tableRowCountToBeBetween
parameterValues:
- name: minValue
value: "10000"
- name: maxValue
value: "1000000"
# Column-level tests
- name: user_id_not_null
testDefinitionName: columnValuesToBeNotNull
columnName: user_id
- name: event_timestamp_format
testDefinitionName: columnValuesToMatchRegex
columnName: event_timestamp
parameterValues:
- name: regex
value: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z$"
workflowConfig:
loggerLevel: INFO
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: ${OPENMETADATA_JWT_TOKEN}
Advanced TestRunner Configuration
Customizing Workflow Behavior
from metadata.sdk.data_quality import TestRunner
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
runner = TestRunner.for_table("BigQuery.analytics.events.user_sessions")
# Configure detailed settings
runner.setup(
force_test_update=True, # Update existing test definitions
log_level=LogLevels.DEBUG, # Enable debug logging
raise_on_error=False, # Continue on errors
success_threshold=95, # Require 95% success rate
enable_streamable_logs=True # Stream logs in real-time
)
# Add tests and run
runner.add_test(TableRowCountToBeBetween(min_count=1000))
results = runner.run()
Accessing Test Definitions
Inspect configured tests before running:
runner = TestRunner.for_table("MySQL.ecommerce.public.orders")
runner.add_tests(
TableRowCountToBeBetween(min_count=100),
ColumnValuesToBeNotNull(column="order_id")
)
# Access test definitions
for test_def in runner.test_definitions:
print(f"Test: {test_def.testDefinitionName}")
print(f"Parameters: {test_def.parameterValues}")
Results can be published back to OpenMetadata for tracking, alerting, and visualization:
DataFrame Validation Results
from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator
validator = DataFrameValidator()
validator.add_openmetadata_table_tests("Postgres.staging.public.customers")
result = validator.validate(df)
# Publish results to OpenMetadata
result.publish("Postgres.staging.public.customers")
Benefits of Publishing Results
- Historical tracking: View trends over time
- Alerting: Trigger notifications on failures
- Dashboards: Centralized data quality monitoring
- Collaboration: Share results across teams
- Compliance: Maintain audit trails
Error Handling and Retries
Implement robust error handling:
import time
from metadata.sdk.data_quality import TestRunner
def run_with_retry(table_fqn, max_retries=3, backoff=2):
"""Run tests with exponential backoff retry"""
for attempt in range(max_retries):
try:
runner = TestRunner.for_table(table_fqn)
results = runner.run()
return results
except ConnectionError as e:
if attempt < max_retries - 1:
wait_time = backoff ** attempt
print(f"Connection failed, retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"Failed after {max_retries} attempts")
raise
except ValueError as e:
print(f"Configuration error: {e}")
raise # Don't retry configuration errors
except Exception as e:
print(f"Unexpected error: {e}")
raise
# Usage
results = run_with_retry("Postgres.warehouse.public.customers")
Dynamic Test Generation
Generate tests programmatically based on metadata:
from metadata.sdk import configure, client
from metadata.sdk.data_quality import (
TestRunner,
ColumnValuesToBeNotNull,
ColumnValuesToBeUnique
)
configure(host="http://localhost:8585/api", jwt_token="token")
# Get table metadata
om_client = client()
table = om_client.ometa.get_by_name(
entity=Table,
fqn="Postgres.warehouse.public.customers"
)
# Generate tests based on column types
runner = TestRunner.for_table(table.fullyQualifiedName.root)
for column in table.columns:
# Add NOT NULL tests for required columns
if column.constraint == "NOT NULL":
runner.add_test(ColumnValuesToBeNotNull(column=column.name.root))
# Add UNIQUE tests for primary keys
if column.constraint == "PRIMARY KEY":
runner.add_test(ColumnValuesToBeUnique(column=column.name.root))
results = runner.run()
Multi-Table Validation
Validate multiple tables in a workflow:
from metadata.sdk.data_quality import TestRunner, TableRowCountToBeBetween
tables_to_validate = {
"Postgres.warehouse.public.customers": {"min_rows": 10000},
"Postgres.warehouse.public.orders": {"min_rows": 50000},
"Postgres.warehouse.public.products": {"min_rows": 1000}
}
validation_results = {}
for table_fqn, config in tables_to_validate.items():
runner = TestRunner.for_table(table_fqn)
runner.add_test(TableRowCountToBeBetween(min_count=config["min_rows"]))
results = runner.run()
validation_results[table_fqn] = {
"passed": all(r.testCaseResult.testCaseStatus == "Success" for r in results),
"details": results
}
# Generate summary
total_tables = len(validation_results)
passed_tables = sum(1 for v in validation_results.values() if v["passed"])
print(f"\n{'='*60}")
print(f"Validation Summary: {passed_tables}/{total_tables} tables passed")
print(f"{'='*60}")
for table_fqn, result in validation_results.items():
status = "✓" if result["passed"] else "✗"
print(f"{status} {table_fqn}")
Best Practices Summary
- Version control test configurations: Store YAML configs in git
- Use environment variables: Never hardcode credentials
- Implement retries: Handle transient failures gracefully
- Publish results: Enable tracking and alerting in OpenMetadata
- Monitor execution: Track metrics for test runs
- Handle errors explicitly: Don’t silently swallow failures
- Document tests: Use descriptive names and descriptions
- Validate incrementally: Test early and often in pipelines
- Separate concerns: Let data stewards define tests, engineers execute them
- Test your tests: Ensure test definitions are correct
Next Steps