airbyte-api-python-sdk
airbyte-api-python-sdk copied to clipboard
The SDK doesnt seem to work for custom sources/destination as source type seems non optional even though its optional in actual api
Script Used To Test
import airbyte
from airbyte.models import shared
s = airbyte.Airbyte(
security=shared.Security(
server_url="http://localhost:8006/v1",
basic_auth=shared.SchemeBasicAuth(
password=<pwd>,
username=<username>,
),
),
)
req = operations.GetSourceRequest(
source_id=<custom-source-id>,
)
res = airbyte_client.sources.get_source(req)
if res.source_response is not None:
print("source fetched")
Error Observed
Traceback (most recent call last):
File "/mnt/c/ACA-Group/alpha-airbyte/web/api/source/test.py", line 173, in
Additionally
It is expecting source_type to be mandatorily available in SourceResponse which is true only for non custom connectors
Although hitting the API via postman works without issues (http://localhost:8006/v1/sources/
Also looks like the dataclasses for source response is restrictive and tailor made only for noncustom connectors as I see the source_type is not defined as Optional[str] and Configurations is a Union of defined sources only, so it would fail to accommodate for custom sources with custom configurations https://github.com/airbytehq/airbyte-api-python-sdk/blob/main/src/airbyte/models/shared/sourceresponse.py#L207