Source

The Source is the connector to external systems and outputs a record for downstream to process and push to OpenMetadata.

@dataclass  # type: ignore[misc]
class Source(Closeable, metaclass=ABCMeta):
ctx: WorkflowContext
    @classmethod
    @abstractmethod
    def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Source":
        pass

    @abstractmethod
    def prepare(self):
        pass

    @abstractmethod
    def next_record(self) -> Iterable[Record]:
        pass

    @abstractmethod
    def get_status(self) -> SourceStatus:
        pass

create method is used to create an instance of Source.

prepare will be called through Python's init method. This will be a place where you could make connections to external sources or initiate the client library.

next_record is where the client can connect to an external resource and emit the data downstream.

get_status is for the workflow to call and report the status of the source such as how many records its processed any failures or warnings.

A simple example of this implementation is

class SampleTablesSource(Source):

    def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
        super().__init__(ctx)
        self.status = SampleTableSourceStatus()
        self.config = config
        self.metadata_config = metadata_config
        self.client = REST(metadata_config)
        self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
        self.database = json.load(open(config.sample_schema_folder + "/database.json", 'r'))
        self.tables = json.load(open(config.sample_schema_folder + "/tables.json", 'r'))
        self.service = get_service_or_create(self.service_json, metadata_config)

    @classmethod
    def create(cls, config_dict, metadata_config_dict, ctx):
        config = SampleTableSourceConfig.parse_obj(config_dict)
        metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
        return cls(config, metadata_config, ctx)

    def prepare(self):
        pass

    def next_record(self) -> Iterable[Entity]:

        yield from self.yield_create_request_database_service(self.config)

        service_entity: DatabaseService = self.metadata.get_by_name(
            entity=DatabaseService, fqn=self.config.serviceName
        )
        service_id = service_entity.id

        yield CreateDatabaseRequest(
            name="awesome-database",
            service=EntityReference(
                id=service_id,
                type="databaseService",
            ),
        )

        database_entity: Database = self.metadata.get_by_name(
            entity=Database, fqn.build(
                    self.metadata,
                    entity_type=Database,
                    service_name=self.context.database_service.name.__root__,
                    database_name="awesome-database",
                )
        )
        database_id = database_entity.id

        yield CreateDatabaseSchemaRequest(
            name="awesome-schema",
            description="description",
            database=EntityReference(id=database_id, type="database"),
        )

        database_schema_entity: DatabaseSchema = self.metadata.get_by_name(
            entity=DatabaseSchema, fqn.build(
                    self.metadata,
                    entity_type=DatabaseSchema,
                    service_name=self.context.database_service.name.__root__,
                    database_name="awesome-database",
                    schema_name="awesome-schema"
                )
        )
        database_schema_id = database_schema_entity.id

        yield CreateTableRequest(
            name="awesome-table",
            description="description",
            columns="columns",
            databaseSchema=EntityReference(
                id=database_schema_id, type="databaseSchema"
            ),
            tableConstraints=table.get("tableConstraints"),
            tableType=table["tableType"],
        )

    def close(self):
        pass

    def get_status(self):
        return self.status

As a consumer of Openmetadata-ingestion package, You can to add your custom connectors within the same namespace but in a different package repository.


Here is the situation

├─my_code_repository_package
  ├── src
      ├── my_other_relevant_code_package
      ├── metadata
      │   └── ingestion
      │       └── source
      │        └── database
      │         └── my_awesome_connector.py
      └── setup.py
├── openmetadata_ingestion
  ├── src
      ├── metadata
      │   └── ingestion
      │       └── source
      │        └── database
      │         └── existingSource1
      |         └── existingSource2
      |         └── ....
      └── setup.py

If you want my_awesome_connector.py to build as a source and run as a part of workflows defined in openmetadata_ingestion below are the steps.


First add your coustom project in PyCharm.

Add project in pycharm

Now Go to IDE and Project Settings in PyCharm, inside that go to project section, and select python interpreter, Select virtual environment created for the project as python interpreter

Select interpreter in pycharm

Now apply and okay that interpreter

Select interpreter in pycharm

Still have questions?

You can take a look at our Q&A or reach out to us in Slack

Was this page helpful?

editSuggest edits