Develop the Ingestion Code
We recommend you to take some time to understand how the Ingestion Framework works by reading this small article. The main takes for developing a new connector are:- To understand that each of our Source Types (Databases, Dashboards, etc) have a Topology attached.
- To understand that the process flow is implemented as a generator chain, going through each step.
Service Spec
When developing a new database ingestion connector in OpenMetadata, ensure all necessary components are correctly configured. This guide outlines the steps required to define the connector’s ingestion capabilities using aservice_spec.py file.
Why Use service_spec.py?
The service_spec.py file centralizes the definitions of sources, profilers, lineage, and other ingestion-related components for a connector. This approach helps standardize implementations across connectors, making it easier to manage ingestion workflows.
Steps to Develop a New Connector
1. Create the service_spec.py File
Add a service_spec.py file within the connector’s directory. This file will define the components needed for ingestion, such as metadata sources, lineage sources, profilers, and samplers.
2. Use the DefaultDatabaseSpec Class
The DefaultDatabaseSpec class simplifies the definition of connectors by bundling the required components. Import the DefaultDatabaseSpec and reference the appropriate classes for your connector.
3. Define the ServiceSpec
Customize the ServiceSpec object based on the features of your connector. Below is an example configuration:
4. Adjust Classes for Your Connector
Replace the example classes (e.g.,BigquerySource, BigqueryLineageSource, etc.) with those specific to your connector. Depending on the connector’s features, you may include or exclude certain components like usage or profiling.
Components of service_spec.py
metadata_source_class: Defines the class for metadata ingestion.lineage_source_class: Defines the class for lineage extraction.usage_source_class: Tracks data usage patterns.profiler_class: Profiles data for quality and insights.sampler_class: Samples data for efficient ingestion.
Example Workflow
Step 1: Add service_spec.py
Place the file in the connector’s directory.
Step 2: Configure Components
Define theServiceSpec using the required classes, adjusting for your connector’s capabilities.
Step 3: Verify Integration
Run the ingestion workflow to test the connector and ensure all components are functioning correctly.Service Topology
The Topology defines a series of Nodes and Stages that get executed in a hierarchical way and describe how we extract the needed data from the sources. Starting from the Root node we process the entities in a depth first approach, following the topology tree through the node’s children. From the Service Topology you can understand what methods you need to implement:- producer: Methods that will fetch the entities we need to process
- processor: Methods that will
yielda givenEntity - post_process: Methods that will
yielda givenEntitybut are ran after all entities from that node were processed.
Example - DatabaseServiceTopology
Can be found iningestion/src/metadata/ingestion/source/database/database_service.py
OpenMetadata 1.6.0 or later
Starting from 1.6.0 the OpenMetadata Ingestion Framewotk is using a ServiceSpec specificaiton
in order to define the entrypoints for the ingestion process.
Service Source
Now that you understand how the Ingestion Process works, you need to understand the Service Source. A Service Source is an abstract class that is the base for any Connector from that Source Type. They tend to have a lot of methods and are pretty overwhelming at first glance but you don’t need to worry. You’ll need to check which abstract methods you need to implement in your connector.Example - DatabaseServiceSource
Can be found iningestion/src/metadata/ingestion/source/database/database_service.py
Performance and Memory Management
Ingestion connectors run inside containers with fixed memory limits. Failing to handle pagination, memory cleanup, or resource management causes silent data loss or OOM crashes in production. These are the most critical patterns to follow.Pagination (Required)
Every client method that fetches a list of entities from a REST API must implement pagination if the API supports it. Missing pagination is the most dangerous bug in connectors — it silently returns only the first page of results with no error.Lookup Optimization
When you need to look up entities by ID or path during iteration, build a dictionary once and use O(1) lookups — don’t iterate a list every time.Memory Management
Connectors that read files (storage connectors especially) or process large query results must manage memory carefully to avoid OOM:- Never load entire files without a size check:
- Delete large objects after processing and call
gc.collect():
- Use generators in yield methods — don’t accumulate results in a list:
- Bound all caches — use
lru_cache(maxsize=)or clear between scopes:
- Stream query results — use
.fetchmany(), not.all()on large tables:
Connection Reuse
REST clients should create onerequests.Session and reuse it for all requests:
Next Step
With the Code ready to go, we can now proceed to make a small change in the UI to be able to configure the Connector properly from there.Apply the UI Changes
Learn what you need to do to be able see the Connector properly in the UI