airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

[EPIC] Skip stream syncing when stream is unavailable

Open pedroslopez opened this issue 2 years ago • 2 comments

Moving towards https://github.com/airbytehq/airbyte/issues/14906 and the stream availability proposal outlined in this doc, we need a way for connectors to declare whether a stream is available or not. Currently, connectors are doing this on their own and the behavior is not standardized, so this is about adding functionality in the CDK to make it easy for connector developers to do the right thing. This ticket will use this feature skip the stream when syncing, but can later be used to set unavailability info on the stream catalog once the protocol has been updated to support this.

Implementation proposal

I would propose adding two new methods in the cdk, one at the source level and one at the stream level:

  • AbstractSource.get_stream_availability_data(): returns a python dictionary with any data the connector may need to determine stream availability. For example, get the authorized scopes for the currently used api key or issue a request to determine the account level (e.g. manager vs regular account type).
  • Stream.is_unavailable(availability_data): is passed in the source-level availability data returned by the previous method, and returns a tuple is_unavailable, unavailability_reason (e.g. False, "This stream is only available for manager accounts)

The CDK can use these methods to automatically skip these streams and log a message when syncing. Some pseudo-code for AbstractSource.read():

AbstractSource.read:
        availability_data = source.get_stream_availability_data()
	before calling the stream's read:
		is_unavailable, unavailable_reason = stream_instance.is_unavailable(asdfasdf)
                if is_unavailable:
                       log(Skipped syncing stream XYZ: {unavailable_reason})
                 else:
                       do the sync

Implementation example (hubspot):

class Source:
	def get_stream_avaiability_data:
		granted_scopes = fetch()
		return {'granted_scopes': granted_scopes} 

class Stream:
	def is_unavailable(availability_data):
		if "my_scope" in availability_data.get("granted_scopes", []):
			return False, None
		return True, "my_scope not granted"

Acceptance criteria

  • CDK exposes new methods to determine whether a stream is available
  • CDK auto skips streams if they are not available

pedroslopez avatar Sep 13 '22 14:09 pedroslopez

  • for sources that dont have an endpoint to determine permissions, at the stream-level availability method we can just do the first read() - it's ok for now that this will happen again when we actually start reading the stream since we can cache the request later

  • could we do a default implementation that calls read on the connector and catches 403

  • set status codes that determine whether it's unavailable by deafult (good for low code) as deafult implementation

  • http stream has anew method to get unavailable statuses

  • do a read request and mark as unavilable if it has one of these status

  • isolate concept of straem availability so it can be swapped out

  • AvailabilityStrategy can be passed to the streams

  • availabile scopes can be determined at the strategy level and used when calling from the stream

  • avilabl;ility strategy can also be defined for status codes

  • look into a default implementation

pedroslopez avatar Sep 13 '22 18:09 pedroslopez

From https://github.com/airbytehq/airbyte/issues/3268:

grooming notes:

  • we may even want to think of these as threads / futures rather than a for loop that accumulates failures. programmatically it may be better.
  • we should report all the streams that failed
  • we should probably output one trace message per failure and eventually the platform should handle multiple failures. If they are the same they get grouped in sentry. If they are not they are split as separate incidents.

TODO;

  • [ ] create a ticket for the platform to handle multiple failure reasons and report all of them to sentry. Keep in mind that there is a UI component here as well. it only shows one failure reason today.

evantahler avatar Sep 26 '22 23:09 evantahler