PyAirbyte icon indicating copy to clipboard operation
PyAirbyte copied to clipboard

Feature Proposal: `CloudWorkspace.get_custom_source()`

Open aaronsteers opened this issue 10 months ago • 3 comments

Challenge

As of today, it's a manual process to test custom connectors in PyAirbyte. Whenever the source definition changes, there's a manual step needed to copy-paste the yaml into the runtime environment where you are running PyAirbyte.

Proposal

Add to CloudWorkspace:

  • CloudWorkspace.custom_sources: list[CustomSourceConnector] - a list of custom sources defined in this workspace.
  • CloudWorkspace.get_custom_source(*, definition_id: str | None, name: str) -> CustomSourceConnector - Allow getting custom connector by ID or by name. User should provide one or the other, but not both.

Pseudocode

CustomSourceConnector might have a definition something like this:

@dataclass
class CustomSourceConnector:
    workspace: CloudWorkspace
    name: str
    manifest: dict
    version: int | None
    definition_id: str

    def as_local_source(
        self,
        /,
        config: dict | None = None,
        config_overlay: dict | None = None,
    ) -> Source:
        """Return a local source object which can be executed locally.
        If `config` is provided, it will replace the Cloud config.
        If `config_overlay` is provided, it will be overlayed on top of the Cloud config.

        Note: By design, PyAirbyte cannot retrieve secrets from the Cloud API endpoints.
        Any secret config parameters will be returned as `******` and will need to be
        replaced using a config overlay.
        """
        ...

Usage Example A

We pass the manifest to get_source().

import airbyte as ab

my_workspace = ab.CloudWorkspace(
    client_id=...,
    client_secret=...,
)

my_source_definition = my_workspace.get_custom_source(name="My Test")
my_source = ab.get_source(
    "source-my-test",
    declarative_manifest=my_source_definition.manifest,
    config={...},
)

# Now we can work on it like a normal local source...
my_source.check()
my_source.read()

Usage Example B

We use the as_local_source() method to get a local Source object.

import airbyte as ab

my_workspace = ab.CloudWorkspace(
    client_id=...,
    client_secret=...,
)

my_source_definition = my_workspace.get_custom_source(name="My Test")
my_source = my_source_definition.as_local_source(config_overlay={"password": ...})

# Now we can work on it like a normal local source...
my_source.check()
my_source.read()

aaronsteers avatar Jan 15 '25 00:01 aaronsteers