PyAirbyte
PyAirbyte copied to clipboard
Feature Proposal: `CloudWorkspace.get_custom_source()`
Challenge
As of today, it's a manual process to test custom connectors in PyAirbyte. Whenever the source definition changes, there's a manual step needed to copy-paste the yaml into the runtime environment where you are running PyAirbyte.
Proposal
Add to CloudWorkspace:
CloudWorkspace.custom_sources:list[CustomSourceConnector]- a list of custom sources defined in this workspace.CloudWorkspace.get_custom_source(*, definition_id: str | None, name: str) -> CustomSourceConnector- Allow getting custom connector by ID or by name. User should provide one or the other, but not both.
Pseudocode
CustomSourceConnector might have a definition something like this:
@dataclass
class CustomSourceConnector:
workspace: CloudWorkspace
name: str
manifest: dict
version: int | None
definition_id: str
def as_local_source(
self,
/,
config: dict | None = None,
config_overlay: dict | None = None,
) -> Source:
"""Return a local source object which can be executed locally.
If `config` is provided, it will replace the Cloud config.
If `config_overlay` is provided, it will be overlayed on top of the Cloud config.
Note: By design, PyAirbyte cannot retrieve secrets from the Cloud API endpoints.
Any secret config parameters will be returned as `******` and will need to be
replaced using a config overlay.
"""
...
Usage Example A
We pass the manifest to get_source().
import airbyte as ab
my_workspace = ab.CloudWorkspace(
client_id=...,
client_secret=...,
)
my_source_definition = my_workspace.get_custom_source(name="My Test")
my_source = ab.get_source(
"source-my-test",
declarative_manifest=my_source_definition.manifest,
config={...},
)
# Now we can work on it like a normal local source...
my_source.check()
my_source.read()
Usage Example B
We use the as_local_source() method to get a local Source object.
import airbyte as ab
my_workspace = ab.CloudWorkspace(
client_id=...,
client_secret=...,
)
my_source_definition = my_workspace.get_custom_source(name="My Test")
my_source = my_source_definition.as_local_source(config_overlay={"password": ...})
# Now we can work on it like a normal local source...
my_source.check()
my_source.read()