airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

[RFR for API Sources] New Python interfaces to support resumable full refresh

Open brianjlai opened this issue 10 months ago • 1 comments

What

Primarily updates how the Stream class performs a read. The big change is around how we resolve what the next partition of records to retrieve is. Because resumable full refresh operates under the paradigm of an unbounded set of pages (unlike incremental partitioned time windows), we need to change how we determine the next slice.

The primary changes to the flow at a high level are:

  • Consolidates a lot of the branching logic to be agnostic of the incoming catalog's sync mode which incorporated parts of Ella's changes in https://github.com/airbytehq/airbyte/pull/36999
  • Instead of stream_slices() being the mechanism for determining the next partition of records to retrieve. I've introduced the concept of a CheckpointReader whose type is instantiated based on the Stream's implementation
  • The reader interprets and passed state back to the ConnectorStateManager by observing the stream's current state.
  • The CDK is now "state aware" meaning that it actually reads stream state instead of just passing it back to the platform like a block box. I'll go into this in more detail below.

How

Some of the major design changes in the review are:

  • The new CheckpointReader class which is now the main way a stream determines the next partition of values to read. For incremental this continues to be partitions like time windows. For RFR, this is the next page of records. And for RFR this can be parent records for substreams or a single slice {}.
  • Deprecating IncrementalMixin in favor of StateMixin since state is used by RFR streams which are not incremental. This is a better name, but I kept the old one for backwards compatibility reasons
  • Changing the default Stream.stream_slices() implementation from [None] to [{}]. None is now the indicator to stop iterating over slices
  • Adding the supports_checkpointing field to streams. It’s needed for two areas. We need to surface this value to the catalog. AND we need this to be overridable because declarative low-code sources delineation for checkpointing differs from python sources.

This PR does not implement the work for a substream that requires resumable full refresh. I have a sketched out interface to see if its possible which it does appear to be. But substream state management for RFR becomes quite convoluted due to the issue I'll go into below.

Making the CDK and Stream class read directly from the connector managed state

This is arguably the most controversial DX and design change compared to before RFR. It has some impact on connector developers. In order for RFR to function using the current read_records() method, we need some way of communicating state from the the specific connector implementation back to the CDK. To do so, we expect the developer to manage Stream.state. And with RFR we now read the input to decide what to do next. For example:

if state: { "pageNumber": 23 } then, continue syncing.

if state: {} then, stop syncing no more pages.

This is a relatively simple example, but it does illustrate that the developer needs to know to emit {} to stop RFR paging. In hubspot updating state is done with self.state = self.next_page_token(response) or {}. But it feels like not an ideal precedent that a developer needs to have a general awareness of how the checkpoint reader works in order to successfully implement RFR. And this is coupled with the CheckpointReader being mostly an internal CDK implementation detail that developers shouldn't need to think of.

As mentioned earlier, substream RFR streams which would need per-partition cursors requires much more careful reading of the state object for specific structure which is indicative that state as an unstructured map is not the right data type.

This is just for connectors that implement the legacy Python CDK, and since state is managed internally by low-code connectors, I think its fair to also say from the 80:20 rule, we are aiming for a more ideal interface for low-code which makes up a majority of our connectors vs legacy Python.

Alternative:

I did also look into incremental vs. RFR streams having different versions of the read_records() method. For incremental/full refresh, they would have the normal read_records() -> iterable. And for RFR read_records() -> iterable + updated stream state. This however felt like a step in the wrong direction because we're moving back into two different runtime flows depending on sync type.

A structured state class:

This feels like the more appropriate long term solution. If we have a state interface that handles interpreting state and explicitly communicate what to do with state and what the CDK should emit back to the platform. This is well outside what we can do for the project, but something that I thought about.

Review guide

  1. checkpoint_reader.py
  2. core.py
  3. declarative_stream.py
  4. abstract_source.py
  5. test files

User Impact

noop

Can this PR be safely reverted and rolled back?

  • [x] YES 💚
  • [ ] NO ❌

brianjlai avatar Apr 19 '24 07:04 brianjlai

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview May 6, 2024 9:35pm

vercel[bot] avatar Apr 19 '24 07:04 vercel[bot]

ran a few local connectors-ci runs of hubspot, a few low-code connectors to see that tests passed before the merge

brianjlai avatar May 06 '24 22:05 brianjlai