> ## Documentation Index
> Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Metadata Ingestion | Technical Architecture

> Technical deep dive into the OpenMetadata metadata ingestion framework — topology-based execution, source hierarchy, and how to add new connectors.

# Metadata Ingestion

The metadata ingestion framework lives at `ingestion/src/metadata/ingestion/`. It extracts metadata (databases, schemas, tables, columns, dashboards, pipelines, etc.) from external systems and publishes it to the OpenMetadata server.

This guide covers the core framework architecture. For usage and lineage ingestion — which build on these same patterns — see the dedicated pages.

## Core Pipeline

Every ingestion workflow follows the **Source → Sink** pattern, orchestrated by `workflow/metadata.py`:

```
┌──────────────────────────────────────────────────────────┐
│  Source                                                   │
│  Topology-driven metadata extraction                      │
│                                                          │
│  Traverses: Service → Database → Schema → Table → Column │
│  Yields: Either[CreateEntityRequest] per entity           │
└──────────────────────┬───────────────────────────────────┘
                       │
                       ▼
┌──────────────────────────────────────────────────────────┐
│  MetadataRestSink                                        │
│  ingestion/sink/metadata_rest.py                         │
│                                                          │
│  For each CreateEntityRequest:                           │
│    PUT /api/v1/{entityType}                              │
│    (or PATCH if fingerprint changed)                     │
└──────────────────────────────────────────────────────────┘
```

## Topology-Based Execution

The key architectural pattern is the **topology**. Instead of hand-coded loops, each source declares a tree of `TopologyNode` objects that the framework traverses depth-first.

### How It Works

A topology is a tree where each node has:

* **`producer`** — a method that yields raw items (e.g., database names)
* **`stages`** — processing steps that transform each item into an entity request
* **`children`** — child nodes to recurse into
* **`post_process`** — cleanup methods run after all children complete

### Database Service Topology

This is the topology for all SQL database connectors (`source/database/database_service.py`):

```
root (Service)
  │  producer: get_services()
  │  stages: yield_create_request_database_service()
  │
  └─ database
       │  producer: get_database_names()
       │  stages: yield_database()
       │
       └─ databaseSchema
            │  producer: get_database_schema_names()
            │  stages: yield_database_schema()
            │
            ├─ table
            │    producer: get_tables_name_and_type()
            │    stages:
            │      1. yield_table_tag_details()     → Tags
            │      2. yield_table()                 → CreateTableRequest
            │      3. yield_life_cycle_data()       → LifeCycle
            │
            └─ storedProcedure
                 producer: get_stored_procedures()
                 stages: yield_stored_procedure()

  post_process: mark_tables_as_deleted()
```

### Execution Engine

`TopologyRunnerMixin` (`api/topology_runner.py`) drives the traversal:

```python theme={null}
process_nodes(topology_nodes)
  for node in topology_nodes:
    for item in node.producer():          # e.g., get_database_names()
      for stage in node.stages:
        entity = stage.processor(item)    # e.g., yield_database()
        → send to sink
      process_nodes(node.children)        # recurse into schemas, tables, ...
    for post in node.post_process:
      post()                              # e.g., mark_tables_as_deleted()
```

Nodes marked with `threads=True` distribute items across a thread pool. Each thread gets a copy of the `TopologyContext`.

### Topology Context

`TopologyContext` is a thread-safe state object that tracks the current position in the hierarchy. As the traversal descends, it stores the current service, database, schema, etc., so child nodes can build fully qualified names (FQNs):

```
service.database.schema.table.column
```

## Source Class Hierarchy

For database connectors, the class hierarchy is:

```
Source (api/steps.py)
│   Abstract: prepare(), test_connection(), _iter()
│
└─ DatabaseServiceSource (source/database/database_service.py)
   │   Defines DatabaseServiceTopology
   │   Implements: get_services(), yield_database(), yield_database_schema(),
   │               yield_table(), mark_tables_as_deleted()
   │
   └─ CommonDbSourceService (source/database/common_db_source.py)
      │   Implements: get_database_names(), get_database_schema_names(),
      │               get_tables_name_and_type(), get_columns_and_constraints()
      │   Uses: SqlColumnHandlerMixin, SqlAlchemySource
      │
      └─ [Concrete connectors]
           PostgresSource, MySQLSource, SnowflakeSource, BigQuerySource, ...
           Override database-specific methods
```

### Key Methods in CommonDbSourceService

| Method                          | What It Does                                      |
| ------------------------------- | ------------------------------------------------- |
| `get_database_names()`          | Lists databases (single or multi-database)        |
| `get_database_schema_names()`   | Lists schemas, applies filter patterns            |
| `get_tables_name_and_type()`    | Lists tables/views via SQLAlchemy Inspector       |
| `get_columns_and_constraints()` | Extracts columns, PKs, FKs, unique constraints    |
| `get_schema_definition()`       | Gets DDL or view definition SQL                   |
| `yield_table()`                 | Assembles `CreateTableRequest` from all the above |

### Column Type Parsing

`ColumnTypeParser` (`source/database/column_type_parser.py`) maps database-specific types to OpenMetadata's `DataType` enum:

* Handles complex types: JSON, ARRAY, STRUCT, MAP
* Extracts precision/scale for numeric types
* Supports database-specific type aliases

### Constraint Handling

`SqlColumnHandlerMixin` (`source/database/sql_column_handler.py`) processes:

* **Primary keys** — from `inspector.get_pk_constraint()`
* **Unique constraints** — from `inspector.get_unique_constraints()`
* **Foreign keys** — from `inspector.get_foreign_keys()`, including cross-database references

## Other Service Topologies

The same topology pattern applies to all service types:

### Dashboard Service

```
Service → DataModel → Dashboard → Chart
                                → DashboardLineage
                                → DashboardUsage
```

### Pipeline Service

```
Service → Pipeline → PipelineStatus
                   → PipelineLineage
                   → PipelineUsage
```

### Messaging Service

```
Service → Topic → TopicSampleData
```

### Storage, Search, ML Model, API Services

Each follows the same pattern with service-specific entity types.

## MetadataRestSink

The sink (`sink/metadata_rest.py`) uses `@singledispatchmethod` to route entity types:

```python theme={null}
class MetadataRestSink(Sink):
    @singledispatchmethod
    def _run_dispatch(self, record):
        return self.write_create_request(record)  # Default: PUT to bulk API

    @_run_dispatch.register
    def write_lineage(self, record: AddLineageRequest): ...

    @_run_dispatch.register
    def write_profiler_response(self, record: ProfilerResponse): ...

    # ... specialized handlers for tags, lineage, profiles, test results, etc.
```

### Fingerprinting

Every entity carries a `sourceHash` computed from its metadata. The sink uses this to decide:

* **CREATE** — entity doesn't exist yet
* **PATCH** — entity exists but hash differs (metadata changed)
* **SKIP** — entity exists and hash matches (no changes)

This enables **incremental ingestion** — only changed entities are written.

## Filtering

Filters are applied at the producer level (before any processing):

```python theme={null}
# In get_database_schema_names():
for schema in inspector.get_schema_names():
    if filter_by_schema(self.source_config.schemaFilterPattern, schema):
        self.status.filter(schema, "Schema filtered out")
        continue
    yield schema
```

Available filter patterns:

* `databaseFilterPattern` — include/exclude databases by regex
* `schemaFilterPattern` — include/exclude schemas
* `tableFilterPattern` — include/exclude tables
* Plus service-specific filters (topic, pipeline, dashboard, etc.)

## Connection Management

`source/connections.py` dynamically loads and creates database connections:

* Each service type has a connection class (e.g., `PostgresConnection`, `SnowflakeConnection`)
* `get_connection()` resolves the class and returns a SQLAlchemy Engine or API client
* `test_connection()` validates connectivity before processing

## Multi-Threading

Nodes can enable multi-threaded processing for parallelism:

```python theme={null}
table = TopologyNode(
    producer="get_tables_name_and_type",
    stages=[...],
    threads=True,  # Process tables in parallel
)
```

The `TopologyContextManager` ensures each thread gets an isolated copy of the context while sharing the database connection.

## OpenMetadata API Client

The `OpenMetadata` class (`ometa/ometa_api.py`) is composed of 25+ mixins:

| Mixin                   | Purpose                      |
| ----------------------- | ---------------------------- |
| `OMetaTableMixin`       | Table CRUD, usage, profiling |
| `OMetaPipelineMixin`    | Pipeline operations          |
| `OMetaDashboardMixin`   | Dashboard/chart operations   |
| `OMetaPatchMixin`       | Incremental updates (PATCH)  |
| `OMetaTagGlossaryMixin` | Tag and glossary management  |
| `OMetaServiceMixin`     | Service CRUD                 |
| `OMetaLineageMixin`     | Lineage operations           |

The underlying REST client (`ometa/client.py`) handles authentication, retries, and response parsing.

## Key Design Patterns

| Pattern               | Where                             | Why                                                |
| --------------------- | --------------------------------- | -------------------------------------------------- |
| **Topology**          | `ServiceTopology`, `TopologyNode` | Declarative entity hierarchy — no hand-coded loops |
| **Either monad**      | `Either[T]` wrapping all results  | Unified error handling without exceptions          |
| **Singledispatch**    | `MetadataRestSink._run_dispatch`  | Route entity types to specialized handlers         |
| **Fingerprinting**    | `sourceHash` on all entities      | Incremental ingestion (create vs. patch vs. skip)  |
| **Mixin composition** | `OpenMetadata` with 25+ mixins    | Modular API client without deep inheritance        |
| **Dynamic import**    | `get_connection()`, class loading | Runtime resolution of connector implementations    |

## Key Files Quick Reference

| What you want to do                | Start here                                          |
| ---------------------------------- | --------------------------------------------------- |
| Understand the framework pipeline  | `api/steps.py` (Source, Sink, Processor interfaces) |
| See how topology execution works   | `api/topology_runner.py` → `process_nodes()`        |
| Read the database service topology | `source/database/database_service.py`               |
| See base SQL extraction logic      | `source/database/common_db_source.py`               |
| See column type parsing            | `source/database/column_type_parser.py`             |
| See constraint extraction          | `source/database/sql_column_handler.py`             |
| Read a concrete connector          | `source/database/postgres/metadata.py` (simplest)   |
| See how entities are published     | `sink/metadata_rest.py`                             |
| Read the API client                | `ometa/ometa_api.py`                                |
| See filtering logic                | `utils/filters.py`                                  |
| See topology data models           | `models/topology.py`                                |

<Tip>
  All paths above are relative to `ingestion/src/metadata/ingestion/`. For example, `api/steps.py` means `ingestion/src/metadata/ingestion/api/steps.py`.
</Tip>
