airbyte
airbyte copied to clipboard
[RFR for API Sources] New Python interfaces to support resumable full refresh
What
Primarily updates how the Stream class performs a read. The big change is around how we resolve what the next partition of records to retrieve is. Because resumable full refresh operates under the paradigm of an unbounded set of pages (unlike incremental partitioned time windows), we need to change how we determine the next slice.
The primary changes to the flow at a high level are:
- Consolidates a lot of the branching logic to be agnostic of the incoming catalog's sync mode which incorporated parts of Ella's changes in https://github.com/airbytehq/airbyte/pull/36999
- Instead of
stream_slices()
being the mechanism for determining the next partition of records to retrieve. I've introduced the concept of aCheckpointReader
whose type is instantiated based on the Stream's implementation - The reader interprets and passed state back to the ConnectorStateManager by observing the stream's current state.
- The CDK is now "state aware" meaning that it actually reads stream state instead of just passing it back to the platform like a block box. I'll go into this in more detail below.
How
Some of the major design changes in the review are:
- The new
CheckpointReader
class which is now the main way a stream determines the next partition of values to read. For incremental this continues to be partitions like time windows. For RFR, this is the next page of records. And for RFR this can be parent records for substreams or a single slice{}
. - Deprecating
IncrementalMixin
in favor ofStateMixin
since state is used by RFR streams which are not incremental. This is a better name, but I kept the old one for backwards compatibility reasons - Changing the default
Stream.stream_slices()
implementation from [None] to [{}]. None is now the indicator to stop iterating over slices - Adding the
supports_checkpointing
field to streams. It’s needed for two areas. We need to surface this value to the catalog. AND we need this to be overridable because declarative low-code sources delineation for checkpointing differs from python sources.
This PR does not implement the work for a substream that requires resumable full refresh. I have a sketched out interface to see if its possible which it does appear to be. But substream state management for RFR becomes quite convoluted due to the issue I'll go into below.
Making the CDK and Stream
class read directly from the connector managed state
This is arguably the most controversial DX and design change compared to before RFR. It has some impact on connector developers. In order for RFR to function using the current read_records()
method, we need some way of communicating state from the the specific connector implementation back to the CDK. To do so, we expect the developer to manage Stream.state
. And with RFR we now read the input to decide what to do next. For example:
if state:
{ "pageNumber": 23 }
then,
continue syncing.
if state:
{}
then,
stop syncing no more pages.
This is a relatively simple example, but it does illustrate that the developer needs to know to emit {}
to stop RFR paging. In hubspot updating state is done with self.state = self.next_page_token(response) or {}
. But it feels like not an ideal precedent that a developer needs to have a general awareness of how the checkpoint reader works in order to successfully implement RFR. And this is coupled with the CheckpointReader
being mostly an internal CDK implementation detail that developers shouldn't need to think of.
As mentioned earlier, substream RFR streams which would need per-partition cursors requires much more careful reading of the state object for specific structure which is indicative that state as an unstructured map is not the right data type.
This is just for connectors that implement the legacy Python CDK, and since state is managed internally by low-code connectors, I think its fair to also say from the 80:20 rule, we are aiming for a more ideal interface for low-code which makes up a majority of our connectors vs legacy Python.
Alternative:
I did also look into incremental vs. RFR streams having different versions of the read_records()
method. For incremental/full refresh, they would have the normal read_records() -> iterable
. And for RFR read_records() -> iterable + updated stream state
. This however felt like a step in the wrong direction because we're moving back into two different runtime flows depending on sync type.
A structured state
class:
This feels like the more appropriate long term solution. If we have a state interface that handles interpreting state and explicitly communicate what to do with state and what the CDK should emit back to the platform. This is well outside what we can do for the project, but something that I thought about.
Review guide
-
checkpoint_reader.py
-
core.py
-
declarative_stream.py
-
abstract_source.py
- test files
User Impact
noop
Can this PR be safely reverted and rolled back?
- [x] YES 💚
- [ ] NO ❌
The latest updates on your projects. Learn more about Vercel for Git ↗︎
1 Ignored Deployment
Name | Status | Preview | Comments | Updated (UTC) |
---|---|---|---|---|
airbyte-docs | ⬜️ Ignored (Inspect) | Visit Preview | May 6, 2024 9:35pm |
ran a few local connectors-ci
runs of hubspot, a few low-code connectors to see that tests passed before the merge