airbyte
airbyte copied to clipboard
JDBC Destinations periodically flush staged data based on time interval (checkpointing)
Part of the Connector Checkpointing Epic
We want to ensure that destinations commit any buffered RECORDS to the destination's _airbyte_raw
tables periodically based on a time interval. This issue to create shared code for all of our Java destinations which allow for that behavior.
- The time interval for flushing buffered RECORDS should be controlled by an environment variable with a sensible default - every 15 minutes.
- This time interval should be able to be overridden per individual destination if we learn that this specific destination is slowed down by these frequent commits. We will only learn if every 15 minutes is too frequent once we test...
- We should continue to flush when it makes sense for the destination (e.g. the number of byes in the buffer is approaching a limit for the destination database)
- We will need to re-publish all of the Java destinations once the above changes are added to the CDK.
Getting specific, the measure of what it means to be checkpointable for a destination measures the time between when a state message is received from the source/platform and the time that the next state message is emitted from the destination, indicating a successful flush.
- The timer starts when the destination receives a state message
- The timer is reset whenever the destination flushes from its temporary storage (flat files) to the "real" destination (flat file is uploaded and the INSERT statement completes). Said another way, the timer can be reset when the state message is emitted back from the destination
Possible implementations:
- A real timer thread that calls the flush periodically
- handles the case when sources are slow or get "stuck" and don't emit records (but are still running)
- Every time a record is received, the timer's start time is checked and then the destination determines if it is time to flush or not
- probably simpler than the timer thread
- depends on the source continuing to send records
@evantahler I did have questions to @sherifnada about how destination connectors flush data and It looks like this appropriate issue for my topic.
I see that this issue about making flushing buffer not only by size overflow but also by timer.
Is it also worth to flush buffer by receiving STATE message from source connector(s)?
P.S. Flushing by timer also improve my problem with amazon-ads
connector
https://github.com/airbytehq/oncall/issues/585
I propose resetting the timer after we receive a RECORD message. In typical cases, the buffer is doing the trick most efficiently. But when we are still working but not getting any records for a while, it makes sense to flush the buffer while we wait.
I propose resetting the timer after we receive a RECORD message. In typical cases, the buffer is doing the trick most efficiently.
Assuming that a state regularly sends state messages, we still want the destination to flush every 15 min in the case that the destination crashes. If we wait until the Source is complete, and then the Destination crashes, we loose all of the data we just sent and have to start over.
Assuming that a state regularly sends state messages, we still want the destination to flush every 15 min in the case that the destination crashes. If we wait until the Source is complete, and then the Destination crashes, we loose all of the data we just sent and have to start over.
The normal buffer size is about 15 MBs. If we regularly get records, the buffer cleans more frequently than 15 minutes. Adding additional breakpoints will not save us much but will slow down all normal syncs. I see only one negative case where checkpointing can be useful - when we wait for a new record for a long period. If yes, then we should start our time interval since the last processed record and flush the buffer.
@DoNotPanicUA is the buffer really only 15MB for all destinations? I was under the impression that it was much larger for redshift and perhaps snowflake.
@evantahler There is no standard parameter because of different buffer implementations. For example, BigQuery has 15 MB, and Snowflake has 200 MB. I want to highlight that time-based buffer clean conflicts with the idea of using buffers. And might lead to performance degradation where it's not necessary at all. Right now, we have some destinations where the buffer is configurable from UI, and somewhere it's hardcoded.
Please correct me if I'm wrong. We have some combinations of sources and destinations where the source provides records very slowly while some destinations have huge buffers like a snowflake. And this issue is trying to solve it. If yes, we could optimize buffers and use the recommended buffer size for "problematic" sources. It might help without affecting connections where the big buffer is an advantage.
Yes, we are comfortable with the speed tradeoff in which we flush the buffer more often than required to have more resilient syncs. Thanks for calling that out!
A destination failing to regularly flush/commit creates a few classes of problems:
- If the source is slow, users don't see data until the end of the sync to visualize progress
- If the source is fast but records are small, the buffer limit won't be hit until the end of the sync
- If the destination crashes before flushing, data is lost and sync cannot be partially resumed
- Memory leaks in the worker as we wait for state to be echoed
Comment for grooming: we may not take this into work until we are done with MySQL, but we need to estimate the work in order to correctly prioritize it
Notes from grooming: until normalization runs, the data is not useful for the customer. For destinations, checkpointing will not improve "time to value" for customers, but will improve durability (some data written in case of a crash). However, there is still incremental value in flushing data based on time or volume before the end of the sync - this will help long running jobs that fail before loading all the data into the destination. Ultimate solution may be to move normalization process into destination, so that the data ends up in its final useful state sooner, but that's the larger project.
For the scope of this ticket: add periodic flushing/checkpointing (based on time and/or volume). Whether the checkpointing is based on time needs to be re-evaluated taking into account how destinations load data. Things to consider: object storage does not work well with many small files, large batches are usually more efficient to load into DWs, writing very large files is more error prone.
Converted this to Epic: @grishick TODO: create individual tickets for destinations
Let me know if you want to talk more about why time
was chosen to be the checkpointing threshold - the PRD has the reasoning. Regardless of if the user can "use" the data because it's normalized or not, checkpointing is all about not needing to spend the time re-syncing data we already sent. In the case where the destination crashes, this becomes very helpful.
Related OC issue: 585
summarizing some slack discussion:
There's maybe a technically feasible implementation in platform (where it detects a lack of checkpointing and forces the destination to commit). This has some advantages (possibly-reduced implementation cost, more consistent behavior across connectors), but also some severe disadvantages:
- The most obvious implementation (sending EOF to the destination + start a new sync) can be costly on the source side (e.g. Hadoop-based sources)
- Destination connectors might be required to checkpoint less frequently than whatever platform is trying to enforce. (e.g. certain reverse ETL destinations can only accept one upload per hour). It would get really hairy to customize this per-destination, rather than just having the destination connector own it entirely.
So it still makes sense to implement checkpointing within the connectors. We'll need to eat the per-connector development cost (though e.g. this ticket is an example of implementing once and reaping the benefits across several related destinations). We can add checkpointing as a beta/GA criteria to enforce the minimum frequency, while still giving ourselves leeway in case we need to make an exception.
@ryankfu Is this epic compete?