Documentation Index
Fetch the complete documentation index at: https://docs.open-metadata.org/llms.txt
Use this file to discover all available pages before exploring further.
Performance & Memory Management
Ingestion connectors run inside containers with fixed memory limits. Failing to handle pagination, memory cleanup, or resource management causes silent data loss or OOM crashes in production. These are the most critical patterns to follow.
Every client method that fetches a list of entities from a REST API must implement pagination if the API supports it. Missing pagination is the most dangerous bug in connectors — it silently returns only the first page of results with no error.
# WRONG — only gets the first page, silently drops remaining entities
def get_dashboards(self) -> list:
return self._get("/api/dashboards")["dashboards"]
# CORRECT — follows pagination links
def get_dashboards(self) -> list:
results = []
url = "/api/dashboards"
while url:
data = self._get(url)
results.extend(data["dashboards"])
url = data.get("next_link")
return results
# BEST — generator-based, yields one page at a time
def get_dashboards(self):
url = "/api/dashboards"
while url:
data = self._get(url)
yield from data["dashboards"]
url = data.get("next_link")
Missing pagination on a paginated API is a blocker in code review. Users see partial metadata and assume it’s complete — this is silent data loss.
Lookup Optimization
When you need to look up entities by ID or path during iteration, build a dictionary once and use O(1) lookups — don’t iterate a list every time.
# WRONG — O(n*m): for each dashboard, iterates all folders
def get_project_name(self, dashboard_details):
for folder in self.folders:
if folder.path == dashboard_details.folder_path:
return folder.name
return None
# CORRECT — O(1): build dict once in prepare()
def prepare(self):
super().prepare()
self.folders = self.client.get_folders()
self._folder_by_path = {f.path: f for f in self.folders}
def get_project_name(self, dashboard_details):
folder = self._folder_by_path.get(dashboard_details.folder_path)
return folder.name if folder else None
Memory Management
Connectors that read files (storage connectors especially) or process large query results must manage memory carefully to avoid OOM:
- Never load entire files without a size check:
# WRONG — OOMs on large files
content = self.client.get_object(Bucket=bucket, Key=path)["Body"].read()
data = json.loads(content)
# CORRECT — stream-parse without buffering
response = self.client.get_object(Bucket=bucket, Key=path)
data = json.load(response["Body"])
- Delete large objects after processing and call
gc.collect():
import gc
raw_data = self.client.fetch_all_entities()
parsed = [parse(item) for item in raw_data]
del raw_data
gc.collect()
- Use generators in yield methods — don’t accumulate results in a list:
# WRONG — holds all entities in memory
def yield_dashboard(self, dashboard_details):
results = []
for chart in dashboard_details.charts:
results.append(self._create_chart(chart))
return results
# CORRECT — yields one at a time
def yield_dashboard(self, dashboard_details):
for chart in dashboard_details.charts:
yield Either(right=self._create_chart(chart))
- Bound all caches — use
lru_cache(maxsize=) or clear between scopes:
from functools import lru_cache
@lru_cache(maxsize=1024)
def get_constraints(self, table_fqn: str):
return self._fetch_constraints(table_fqn)
- Stream query results — use
.fetchmany(), not .all() on large tables:
# WRONG — loads entire result set
result = session.execute(query).all()
# CORRECT — streams in batches
result = session.execute(query)
while batch := result.fetchmany(1000):
yield from batch
Connection Reuse
REST clients should create one requests.Session and reuse it for all requests:
# WRONG — creates new connection per request
def _get(self, endpoint):
response = requests.get(f"{self.base_url}{endpoint}")
return response.json()
# CORRECT — shared session with connection pooling
def __init__(self, config):
self._session = requests.Session()
self._session.headers["Authorization"] = f"Bearer {config.token.get_secret_value()}"
def _get(self, endpoint):
response = self._session.get(f"{self.base_url}{endpoint}")
response.raise_for_status()
return response.json()
For the full performance and memory standards with detailed patterns, see skills/standards/performance.md and skills/standards/memory.md in the OpenMetadata repository.
Next Step
With the Code ready to go, we can now proceed to make a small change in the UI to be able to configure the Connector properly from there.
Apply the UI Changes
Learn what you need to do to be able see the Connector properly in the UI