Unable to drain pipelines due to seemingly irrelevant timestamp shift validation
Beam 2.38.0
I encounter the following stacktrace when I try to drain a Dataflow pipeline. During the normal execution the pipeline is flawless, but gets stuck during draining,
I don't explicitly modify windowing, triggering, timestamps, etc, but the built-in IOs might (AFAIK the KafkaIO doesn't, but the using load jobs for BQ insertions does due to the triggering frequency).
The pipeline that produces this:
- KafkaIO.readBytes()
- a custom minimalistic DoFn doing the parsing of KafkaRecord.getKV().getValue()
- BigQueryIO.write() **** method(FILE_LOADS) **** triggeringFrequency(...)
Error message from worker: java.lang.IllegalStateException: TimestampCombiner moved element from 294247-01-10T04:00:54.775Z
(TIMESTAMP_MAX_VALUE) to earlier time 294247-01-09T04:00:54.775Z (end of global window) for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@71e075dc
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:120)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:157)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:104)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:610)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120)
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
java.lang.IllegalStateException: TimestampCombiner
moved element from 294247-01-10T04:00:54.775Z (TIMESTAMP_MAX_VALUE) to earlier time 294247-01-09T04:00:54.775Z
(end of global window) for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@71e075dc
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:120)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:157)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:104)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:610)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120)
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
Imported from Jira BEAM-14544. Original Jira may contain additional context. Reported by: bnemeth.
I am experiencing the exact same issue when using method(STORAGE_WRITE_API), has any resolution been reached? I notice this bug when using Apache Beam version 2.37.0.
@joey-berman I'm on 2.40.0 now and did not face this issue recently, but I have no idea if it's just coincidence, or it's a legit fix.
@nbali When moving to 2.40.0 did you happen to notice a different timestamp issue, namely where the timestamp associated to data for an unbounded PCollection is greater than MAX_TIMESTAMP_VALUE and thus is flagged as an error with the following message: Error message from worker: java.lang.IllegalArgumentException: Cannot output with timestamp 294247-01-10T04:00:54.776Z. Output timestamps must be no earlier than the timestamp of the current input or timer (294247-01-10T04:00:54.776Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew. org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:259) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:85) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.output(SimpleDoFnRunner.java:843) org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.finalizeStream(StorageApiWritesShardedRecords.java:536) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.onTimer(StorageApiWritesShardedRecords.java:550)
@joey-berman I did not, but I have non-zero getAllowedTimestampSkew on my DoFns, so if I get the message right this can't happen to me anyway.
Previously, moving a timestamp backwards could result in dropped data. Today I think we have simplified the model so this may be safe in many situations. It would be potentially interesting design work to figure out the new constraints, if anyone is interested.
But to be clear, drain itself is outside of Beam.