Metadata Ingestion
The metadata ingestion framework lives at ingestion/src/metadata/ingestion/. It extracts metadata (databases, schemas, tables, columns, dashboards, pipelines, etc.) from external systems and publishes it to the OpenMetadata server.
This guide covers the core framework architecture. For usage and lineage ingestion — which build on these same patterns — see the dedicated pages.
Core Pipeline
Every ingestion workflow follows the Source → Sink pattern, orchestrated by workflow/metadata.py:
┌──────────────────────────────────────────────────────────┐
│ Source │
│ Topology-driven metadata extraction │
│ │
│ Traverses: Service → Database → Schema → Table → Column │
│ Yields: Either[CreateEntityRequest] per entity │
└──────────────────────┬───────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ MetadataRestSink │
│ ingestion/sink/metadata_rest.py │
│ │
│ For each CreateEntityRequest: │
│ PUT /api/v1/{entityType} │
│ (or PATCH if fingerprint changed) │
└──────────────────────────────────────────────────────────┘
Topology-Based Execution
The key architectural pattern is the topology. Instead of hand-coded loops, each source declares a tree of TopologyNode objects that the framework traverses depth-first.
How It Works
A topology is a tree where each node has:
producer — a method that yields raw items (e.g., database names)
stages — processing steps that transform each item into an entity request
children — child nodes to recurse into
post_process — cleanup methods run after all children complete
Database Service Topology
This is the topology for all SQL database connectors (source/database/database_service.py):
root (Service)
│ producer: get_services()
│ stages: yield_create_request_database_service()
│
└─ database
│ producer: get_database_names()
│ stages: yield_database()
│
└─ databaseSchema
│ producer: get_database_schema_names()
│ stages: yield_database_schema()
│
├─ table
│ producer: get_tables_name_and_type()
│ stages:
│ 1. yield_table_tag_details() → Tags
│ 2. yield_table() → CreateTableRequest
│ 3. yield_life_cycle_data() → LifeCycle
│
└─ storedProcedure
producer: get_stored_procedures()
stages: yield_stored_procedure()
post_process: mark_tables_as_deleted()
Execution Engine
TopologyRunnerMixin (api/topology_runner.py) drives the traversal:
process_nodes(topology_nodes)
for node in topology_nodes:
for item in node.producer(): # e.g., get_database_names()
for stage in node.stages:
entity = stage.processor(item) # e.g., yield_database()
→ send to sink
process_nodes(node.children) # recurse into schemas, tables, ...
for post in node.post_process:
post() # e.g., mark_tables_as_deleted()
Nodes marked with threads=True distribute items across a thread pool. Each thread gets a copy of the TopologyContext.
Topology Context
TopologyContext is a thread-safe state object that tracks the current position in the hierarchy. As the traversal descends, it stores the current service, database, schema, etc., so child nodes can build fully qualified names (FQNs):
service.database.schema.table.column
Source Class Hierarchy
For database connectors, the class hierarchy is:
Source (api/steps.py)
│ Abstract: prepare(), test_connection(), _iter()
│
└─ DatabaseServiceSource (source/database/database_service.py)
│ Defines DatabaseServiceTopology
│ Implements: get_services(), yield_database(), yield_database_schema(),
│ yield_table(), mark_tables_as_deleted()
│
└─ CommonDbSourceService (source/database/common_db_source.py)
│ Implements: get_database_names(), get_database_schema_names(),
│ get_tables_name_and_type(), get_columns_and_constraints()
│ Uses: SqlColumnHandlerMixin, SqlAlchemySource
│
└─ [Concrete connectors]
PostgresSource, MySQLSource, SnowflakeSource, BigQuerySource, ...
Override database-specific methods
Key Methods in CommonDbSourceService
| Method | What It Does |
|---|
get_database_names() | Lists databases (single or multi-database) |
get_database_schema_names() | Lists schemas, applies filter patterns |
get_tables_name_and_type() | Lists tables/views via SQLAlchemy Inspector |
get_columns_and_constraints() | Extracts columns, PKs, FKs, unique constraints |
get_schema_definition() | Gets DDL or view definition SQL |
yield_table() | Assembles CreateTableRequest from all the above |
Column Type Parsing
ColumnTypeParser (source/database/column_type_parser.py) maps database-specific types to OpenMetadata’s DataType enum:
- Handles complex types: JSON, ARRAY, STRUCT, MAP
- Extracts precision/scale for numeric types
- Supports database-specific type aliases
Constraint Handling
SqlColumnHandlerMixin (source/database/sql_column_handler.py) processes:
- Primary keys — from
inspector.get_pk_constraint()
- Unique constraints — from
inspector.get_unique_constraints()
- Foreign keys — from
inspector.get_foreign_keys(), including cross-database references
Other Service Topologies
The same topology pattern applies to all service types:
Dashboard Service
Service → DataModel → Dashboard → Chart
→ DashboardLineage
→ DashboardUsage
Pipeline Service
Service → Pipeline → PipelineStatus
→ PipelineLineage
→ PipelineUsage
Messaging Service
Service → Topic → TopicSampleData
Storage, Search, ML Model, API Services
Each follows the same pattern with service-specific entity types.
The sink (sink/metadata_rest.py) uses @singledispatchmethod to route entity types:
class MetadataRestSink(Sink):
@singledispatchmethod
def _run_dispatch(self, record):
return self.write_create_request(record) # Default: PUT to bulk API
@_run_dispatch.register
def write_lineage(self, record: AddLineageRequest): ...
@_run_dispatch.register
def write_profiler_response(self, record: ProfilerResponse): ...
# ... specialized handlers for tags, lineage, profiles, test results, etc.
Fingerprinting
Every entity carries a sourceHash computed from its metadata. The sink uses this to decide:
- CREATE — entity doesn’t exist yet
- PATCH — entity exists but hash differs (metadata changed)
- SKIP — entity exists and hash matches (no changes)
This enables incremental ingestion — only changed entities are written.
Filtering
Filters are applied at the producer level (before any processing):
# In get_database_schema_names():
for schema in inspector.get_schema_names():
if filter_by_schema(self.source_config.schemaFilterPattern, schema):
self.status.filter(schema, "Schema filtered out")
continue
yield schema
Available filter patterns:
databaseFilterPattern — include/exclude databases by regex
schemaFilterPattern — include/exclude schemas
tableFilterPattern — include/exclude tables
- Plus service-specific filters (topic, pipeline, dashboard, etc.)
Connection Management
source/connections.py dynamically loads and creates database connections:
- Each service type has a connection class (e.g.,
PostgresConnection, SnowflakeConnection)
get_connection() resolves the class and returns a SQLAlchemy Engine or API client
test_connection() validates connectivity before processing
Multi-Threading
Nodes can enable multi-threaded processing for parallelism:
table = TopologyNode(
producer="get_tables_name_and_type",
stages=[...],
threads=True, # Process tables in parallel
)
The TopologyContextManager ensures each thread gets an isolated copy of the context while sharing the database connection.
The OpenMetadata class (ometa/ometa_api.py) is composed of 25+ mixins:
| Mixin | Purpose |
|---|
OMetaTableMixin | Table CRUD, usage, profiling |
OMetaPipelineMixin | Pipeline operations |
OMetaDashboardMixin | Dashboard/chart operations |
OMetaPatchMixin | Incremental updates (PATCH) |
OMetaTagGlossaryMixin | Tag and glossary management |
OMetaServiceMixin | Service CRUD |
OMetaLineageMixin | Lineage operations |
The underlying REST client (ometa/client.py) handles authentication, retries, and response parsing.
Key Design Patterns
| Pattern | Where | Why |
|---|
| Topology | ServiceTopology, TopologyNode | Declarative entity hierarchy — no hand-coded loops |
| Either monad | Either[T] wrapping all results | Unified error handling without exceptions |
| Singledispatch | MetadataRestSink._run_dispatch | Route entity types to specialized handlers |
| Fingerprinting | sourceHash on all entities | Incremental ingestion (create vs. patch vs. skip) |
| Mixin composition | OpenMetadata with 25+ mixins | Modular API client without deep inheritance |
| Dynamic import | get_connection(), class loading | Runtime resolution of connector implementations |
Key Files Quick Reference
| What you want to do | Start here |
|---|
| Understand the framework pipeline | api/steps.py (Source, Sink, Processor interfaces) |
| See how topology execution works | api/topology_runner.py → process_nodes() |
| Read the database service topology | source/database/database_service.py |
| See base SQL extraction logic | source/database/common_db_source.py |
| See column type parsing | source/database/column_type_parser.py |
| See constraint extraction | source/database/sql_column_handler.py |
| Read a concrete connector | source/database/postgres/metadata.py (simplest) |
| See how entities are published | sink/metadata_rest.py |
| Read the API client | ometa/ometa_api.py |
| See filtering logic | utils/filters.py |
| See topology data models | models/topology.py |
All paths above are relative to ingestion/src/metadata/ingestion/. For example, api/steps.py means ingestion/src/metadata/ingestion/api/steps.py.