Skip to main content

Metadata Ingestion

The metadata ingestion framework lives at ingestion/src/metadata/ingestion/. It extracts metadata (databases, schemas, tables, columns, dashboards, pipelines, etc.) from external systems and publishes it to the OpenMetadata server. This guide covers the core framework architecture. For usage and lineage ingestion — which build on these same patterns — see the dedicated pages.

Core Pipeline

Every ingestion workflow follows the Source → Sink pattern, orchestrated by workflow/metadata.py:
┌──────────────────────────────────────────────────────────┐
│  Source                                                   │
│  Topology-driven metadata extraction                      │
│                                                          │
│  Traverses: Service → Database → Schema → Table → Column │
│  Yields: Either[CreateEntityRequest] per entity           │
└──────────────────────┬───────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│  MetadataRestSink                                        │
│  ingestion/sink/metadata_rest.py                         │
│                                                          │
│  For each CreateEntityRequest:                           │
│    PUT /api/v1/{entityType}                              │
│    (or PATCH if fingerprint changed)                     │
└──────────────────────────────────────────────────────────┘

Topology-Based Execution

The key architectural pattern is the topology. Instead of hand-coded loops, each source declares a tree of TopologyNode objects that the framework traverses depth-first.

How It Works

A topology is a tree where each node has:
  • producer — a method that yields raw items (e.g., database names)
  • stages — processing steps that transform each item into an entity request
  • children — child nodes to recurse into
  • post_process — cleanup methods run after all children complete

Database Service Topology

This is the topology for all SQL database connectors (source/database/database_service.py):
root (Service)
  │  producer: get_services()
  │  stages: yield_create_request_database_service()

  └─ database
       │  producer: get_database_names()
       │  stages: yield_database()

       └─ databaseSchema
            │  producer: get_database_schema_names()
            │  stages: yield_database_schema()

            ├─ table
            │    producer: get_tables_name_and_type()
            │    stages:
            │      1. yield_table_tag_details()     → Tags
            │      2. yield_table()                 → CreateTableRequest
            │      3. yield_life_cycle_data()       → LifeCycle

            └─ storedProcedure
                 producer: get_stored_procedures()
                 stages: yield_stored_procedure()

  post_process: mark_tables_as_deleted()

Execution Engine

TopologyRunnerMixin (api/topology_runner.py) drives the traversal:
process_nodes(topology_nodes)
  for node in topology_nodes:
    for item in node.producer():          # e.g., get_database_names()
      for stage in node.stages:
        entity = stage.processor(item)    # e.g., yield_database()
        → send to sink
      process_nodes(node.children)        # recurse into schemas, tables, ...
    for post in node.post_process:
      post()                              # e.g., mark_tables_as_deleted()
Nodes marked with threads=True distribute items across a thread pool. Each thread gets a copy of the TopologyContext.

Topology Context

TopologyContext is a thread-safe state object that tracks the current position in the hierarchy. As the traversal descends, it stores the current service, database, schema, etc., so child nodes can build fully qualified names (FQNs):
service.database.schema.table.column

Source Class Hierarchy

For database connectors, the class hierarchy is:
Source (api/steps.py)
│   Abstract: prepare(), test_connection(), _iter()

└─ DatabaseServiceSource (source/database/database_service.py)
   │   Defines DatabaseServiceTopology
   │   Implements: get_services(), yield_database(), yield_database_schema(),
   │               yield_table(), mark_tables_as_deleted()

   └─ CommonDbSourceService (source/database/common_db_source.py)
      │   Implements: get_database_names(), get_database_schema_names(),
      │               get_tables_name_and_type(), get_columns_and_constraints()
      │   Uses: SqlColumnHandlerMixin, SqlAlchemySource

      └─ [Concrete connectors]
           PostgresSource, MySQLSource, SnowflakeSource, BigQuerySource, ...
           Override database-specific methods

Key Methods in CommonDbSourceService

MethodWhat It Does
get_database_names()Lists databases (single or multi-database)
get_database_schema_names()Lists schemas, applies filter patterns
get_tables_name_and_type()Lists tables/views via SQLAlchemy Inspector
get_columns_and_constraints()Extracts columns, PKs, FKs, unique constraints
get_schema_definition()Gets DDL or view definition SQL
yield_table()Assembles CreateTableRequest from all the above

Column Type Parsing

ColumnTypeParser (source/database/column_type_parser.py) maps database-specific types to OpenMetadata’s DataType enum:
  • Handles complex types: JSON, ARRAY, STRUCT, MAP
  • Extracts precision/scale for numeric types
  • Supports database-specific type aliases

Constraint Handling

SqlColumnHandlerMixin (source/database/sql_column_handler.py) processes:
  • Primary keys — from inspector.get_pk_constraint()
  • Unique constraints — from inspector.get_unique_constraints()
  • Foreign keys — from inspector.get_foreign_keys(), including cross-database references

Other Service Topologies

The same topology pattern applies to all service types:

Dashboard Service

Service → DataModel → Dashboard → Chart
                                → DashboardLineage
                                → DashboardUsage

Pipeline Service

Service → Pipeline → PipelineStatus
                   → PipelineLineage
                   → PipelineUsage

Messaging Service

Service → Topic → TopicSampleData

Storage, Search, ML Model, API Services

Each follows the same pattern with service-specific entity types.

MetadataRestSink

The sink (sink/metadata_rest.py) uses @singledispatchmethod to route entity types:
class MetadataRestSink(Sink):
    @singledispatchmethod
    def _run_dispatch(self, record):
        return self.write_create_request(record)  # Default: PUT to bulk API

    @_run_dispatch.register
    def write_lineage(self, record: AddLineageRequest): ...

    @_run_dispatch.register
    def write_profiler_response(self, record: ProfilerResponse): ...

    # ... specialized handlers for tags, lineage, profiles, test results, etc.

Fingerprinting

Every entity carries a sourceHash computed from its metadata. The sink uses this to decide:
  • CREATE — entity doesn’t exist yet
  • PATCH — entity exists but hash differs (metadata changed)
  • SKIP — entity exists and hash matches (no changes)
This enables incremental ingestion — only changed entities are written.

Filtering

Filters are applied at the producer level (before any processing):
# In get_database_schema_names():
for schema in inspector.get_schema_names():
    if filter_by_schema(self.source_config.schemaFilterPattern, schema):
        self.status.filter(schema, "Schema filtered out")
        continue
    yield schema
Available filter patterns:
  • databaseFilterPattern — include/exclude databases by regex
  • schemaFilterPattern — include/exclude schemas
  • tableFilterPattern — include/exclude tables
  • Plus service-specific filters (topic, pipeline, dashboard, etc.)

Connection Management

source/connections.py dynamically loads and creates database connections:
  • Each service type has a connection class (e.g., PostgresConnection, SnowflakeConnection)
  • get_connection() resolves the class and returns a SQLAlchemy Engine or API client
  • test_connection() validates connectivity before processing

Multi-Threading

Nodes can enable multi-threaded processing for parallelism:
table = TopologyNode(
    producer="get_tables_name_and_type",
    stages=[...],
    threads=True,  # Process tables in parallel
)
The TopologyContextManager ensures each thread gets an isolated copy of the context while sharing the database connection.

OpenMetadata API Client

The OpenMetadata class (ometa/ometa_api.py) is composed of 25+ mixins:
MixinPurpose
OMetaTableMixinTable CRUD, usage, profiling
OMetaPipelineMixinPipeline operations
OMetaDashboardMixinDashboard/chart operations
OMetaPatchMixinIncremental updates (PATCH)
OMetaTagGlossaryMixinTag and glossary management
OMetaServiceMixinService CRUD
OMetaLineageMixinLineage operations
The underlying REST client (ometa/client.py) handles authentication, retries, and response parsing.

Key Design Patterns

PatternWhereWhy
TopologyServiceTopology, TopologyNodeDeclarative entity hierarchy — no hand-coded loops
Either monadEither[T] wrapping all resultsUnified error handling without exceptions
SingledispatchMetadataRestSink._run_dispatchRoute entity types to specialized handlers
FingerprintingsourceHash on all entitiesIncremental ingestion (create vs. patch vs. skip)
Mixin compositionOpenMetadata with 25+ mixinsModular API client without deep inheritance
Dynamic importget_connection(), class loadingRuntime resolution of connector implementations

Key Files Quick Reference

What you want to doStart here
Understand the framework pipelineapi/steps.py (Source, Sink, Processor interfaces)
See how topology execution worksapi/topology_runner.pyprocess_nodes()
Read the database service topologysource/database/database_service.py
See base SQL extraction logicsource/database/common_db_source.py
See column type parsingsource/database/column_type_parser.py
See constraint extractionsource/database/sql_column_handler.py
Read a concrete connectorsource/database/postgres/metadata.py (simplest)
See how entities are publishedsink/metadata_rest.py
Read the API clientometa/ometa_api.py
See filtering logicutils/filters.py
See topology data modelsmodels/topology.py
All paths above are relative to ingestion/src/metadata/ingestion/. For example, api/steps.py means ingestion/src/metadata/ingestion/api/steps.py.