airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

JDBC Destinations periodically flush staged data based on time interval (checkpointing)

Open evantahler opened this issue 2 years ago • 12 comments

Part of the Connector Checkpointing Epic

We want to ensure that destinations commit any buffered RECORDS to the destination's _airbyte_raw tables periodically based on a time interval. This issue to create shared code for all of our Java destinations which allow for that behavior.

  • The time interval for flushing buffered RECORDS should be controlled by an environment variable with a sensible default - every 15 minutes.
  • This time interval should be able to be overridden per individual destination if we learn that this specific destination is slowed down by these frequent commits. We will only learn if every 15 minutes is too frequent once we test...
  • We should continue to flush when it makes sense for the destination (e.g. the number of byes in the buffer is approaching a limit for the destination database)
  • We will need to re-publish all of the Java destinations once the above changes are added to the CDK.

Getting specific, the measure of what it means to be checkpointable for a destination measures the time between when a state message is received from the source/platform and the time that the next state message is emitted from the destination, indicating a successful flush.

  1. The timer starts when the destination receives a state message
  2. The timer is reset whenever the destination flushes from its temporary storage (flat files) to the "real" destination (flat file is uploaded and the INSERT statement completes). Said another way, the timer can be reset when the state message is emitted back from the destination

Possible implementations:

  • A real timer thread that calls the flush periodically
    • handles the case when sources are slow or get "stuck" and don't emit records (but are still running)
  • Every time a record is received, the timer's start time is checked and then the destination determines if it is time to flush or not
    • probably simpler than the timer thread
    • depends on the source continuing to send records

evantahler avatar Jul 15 '22 20:07 evantahler

@evantahler I did have questions to @sherifnada about how destination connectors flush data and It looks like this appropriate issue for my topic.

I see that this issue about making flushing buffer not only by size overflow but also by timer.

Is it also worth to flush buffer by receiving STATE message from source connector(s)?

P.S. Flushing by timer also improve my problem with amazon-ads connector https://github.com/airbytehq/oncall/issues/585

grubberr avatar Aug 29 '22 14:08 grubberr

I propose resetting the timer after we receive a RECORD message. In typical cases, the buffer is doing the trick most efficiently. But when we are still working but not getting any records for a while, it makes sense to flush the buffer while we wait.

DoNotPanicUA avatar Aug 29 '22 15:08 DoNotPanicUA

I propose resetting the timer after we receive a RECORD message. In typical cases, the buffer is doing the trick most efficiently.

Assuming that a state regularly sends state messages, we still want the destination to flush every 15 min in the case that the destination crashes. If we wait until the Source is complete, and then the Destination crashes, we loose all of the data we just sent and have to start over.

evantahler avatar Aug 29 '22 16:08 evantahler

Assuming that a state regularly sends state messages, we still want the destination to flush every 15 min in the case that the destination crashes. If we wait until the Source is complete, and then the Destination crashes, we loose all of the data we just sent and have to start over.

The normal buffer size is about 15 MBs. If we regularly get records, the buffer cleans more frequently than 15 minutes. Adding additional breakpoints will not save us much but will slow down all normal syncs. I see only one negative case where checkpointing can be useful - when we wait for a new record for a long period. If yes, then we should start our time interval since the last processed record and flush the buffer.

DoNotPanicUA avatar Aug 29 '22 19:08 DoNotPanicUA

@DoNotPanicUA is the buffer really only 15MB for all destinations? I was under the impression that it was much larger for redshift and perhaps snowflake.

evantahler avatar Aug 29 '22 23:08 evantahler

@evantahler There is no standard parameter because of different buffer implementations. For example, BigQuery has 15 MB, and Snowflake has 200 MB. I want to highlight that time-based buffer clean conflicts with the idea of using buffers. And might lead to performance degradation where it's not necessary at all. Right now, we have some destinations where the buffer is configurable from UI, and somewhere it's hardcoded.

Please correct me if I'm wrong. We have some combinations of sources and destinations where the source provides records very slowly while some destinations have huge buffers like a snowflake. And this issue is trying to solve it. If yes, we could optimize buffers and use the recommended buffer size for "problematic" sources. It might help without affecting connections where the big buffer is an advantage.

DoNotPanicUA avatar Aug 30 '22 08:08 DoNotPanicUA

Yes, we are comfortable with the speed tradeoff in which we flush the buffer more often than required to have more resilient syncs. Thanks for calling that out!

A destination failing to regularly flush/commit creates a few classes of problems:

  • If the source is slow, users don't see data until the end of the sync to visualize progress
  • If the source is fast but records are small, the buffer limit won't be hit until the end of the sync
  • If the destination crashes before flushing, data is lost and sync cannot be partially resumed
  • Memory leaks in the worker as we wait for state to be echoed

evantahler avatar Aug 30 '22 17:08 evantahler

Comment for grooming: we may not take this into work until we are done with MySQL, but we need to estimate the work in order to correctly prioritize it

grishick avatar Aug 31 '22 23:08 grishick

Notes from grooming: until normalization runs, the data is not useful for the customer. For destinations, checkpointing will not improve "time to value" for customers, but will improve durability (some data written in case of a crash). However, there is still incremental value in flushing data based on time or volume before the end of the sync - this will help long running jobs that fail before loading all the data into the destination. Ultimate solution may be to move normalization process into destination, so that the data ends up in its final useful state sooner, but that's the larger project.

For the scope of this ticket: add periodic flushing/checkpointing (based on time and/or volume). Whether the checkpointing is based on time needs to be re-evaluated taking into account how destinations load data. Things to consider: object storage does not work well with many small files, large batches are usually more efficient to load into DWs, writing very large files is more error prone.

grishick avatar Sep 07 '22 17:09 grishick

Converted this to Epic: @grishick TODO: create individual tickets for destinations

grishick avatar Sep 07 '22 17:09 grishick

Let me know if you want to talk more about why time was chosen to be the checkpointing threshold - the PRD has the reasoning. Regardless of if the user can "use" the data because it's normalized or not, checkpointing is all about not needing to spend the time re-syncing data we already sent. In the case where the destination crashes, this becomes very helpful.

evantahler avatar Sep 08 '22 01:09 evantahler

Related OC issue: 585

grishick avatar Sep 13 '22 22:09 grishick

summarizing some slack discussion:

There's maybe a technically feasible implementation in platform (where it detects a lack of checkpointing and forces the destination to commit). This has some advantages (possibly-reduced implementation cost, more consistent behavior across connectors), but also some severe disadvantages:

  • The most obvious implementation (sending EOF to the destination + start a new sync) can be costly on the source side (e.g. Hadoop-based sources)
  • Destination connectors might be required to checkpoint less frequently than whatever platform is trying to enforce. (e.g. certain reverse ETL destinations can only accept one upload per hour). It would get really hairy to customize this per-destination, rather than just having the destination connector own it entirely.

So it still makes sense to implement checkpointing within the connectors. We'll need to eat the per-connector development cost (though e.g. this ticket is an example of implementing once and reaping the benefits across several related destinations). We can add checkpointing as a beta/GA criteria to enforce the minimum frequency, while still giving ourselves leeway in case we need to make an exception.

edgao avatar Sep 14 '22 23:09 edgao

@ryankfu Is this epic compete?

evantahler avatar Apr 11 '23 19:04 evantahler