airbyte-api-python-sdk
airbyte-api-python-sdk copied to clipboard
when executing stream_details = s.streams.get_stream_properties(req)
line 697, in unmarshal_json raise AttributeError( AttributeError: unable to unmarshal
reproduce - install latest airbyte helm. setup GITHUB and Snowflake connection
run this code -
from airbyte import Airbyte
from airbyte.connections import shared
from airbyte.models import errors, operations, shared
from airbyte.models.operations import ListConnectionsRequest
import airbyte
from airbyte.models import shared
s = Airbyte(server_url="http://127.0.0.1:8006/v1",
security=shared.Security(
basic_auth=shared.SchemeBasicAuth(
password="<YOUR_PASSWORD_HERE>",
username="<YOUR_USERNAME_HERE>",
),
),)
r = s.connections.list_connections(operations.ListConnectionsRequest())
for connection_id in r.raw_response.json()['data']:
print(connection_id['connectionId'])
cr = s.connections.get_connection(operations.GetConnectionRequest(connection_id['connectionId']))
req = operations.GetStreamPropertiesRequest(
destination_id=cr.raw_response.json()['destinationId'],
source_id=cr.raw_response.json()['sourceId'],
ignore_cache=True,
)
stream_details = s.streams.get_stream_properties(req)