beam icon indicating copy to clipboard operation
beam copied to clipboard

Unable to drain pipelines due to seemingly irrelevant timestamp shift validation

Open damccorm opened this issue 3 years ago • 3 comments

Beam 2.38.0

I encounter the following stacktrace when I try to drain a Dataflow pipeline. During the normal execution the pipeline is flawless, but gets stuck during draining,

I don't explicitly modify windowing, triggering, timestamps, etc, but the built-in IOs might (AFAIK the KafkaIO doesn't, but the using load jobs for BQ insertions does due to the triggering frequency).

The pipeline that produces this:

  • KafkaIO.readBytes()
  • a custom minimalistic DoFn doing the parsing of KafkaRecord.getKV().getValue()
  • BigQueryIO.write() **** method(FILE_LOADS) **** triggeringFrequency(...)

 


Error message from worker: java.lang.IllegalStateException: TimestampCombiner moved element from 294247-01-10T04:00:54.775Z
(TIMESTAMP_MAX_VALUE) to earlier time 294247-01-09T04:00:54.775Z (end of global window) for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@71e075dc
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:120)
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:157)
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:104)
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:610)
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
 
      org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
 
      org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
 
      org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
 
      org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
 
      org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
 
      org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
 
      org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
 
      org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
 
      org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
 
      org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
 
      org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445)
 
      org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
 
      org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120)
 
      org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
 
      java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   
    java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
     
  java.base/java.lang.Thread.run(Thread.java:834)
java.lang.IllegalStateException: TimestampCombiner
moved element from 294247-01-10T04:00:54.775Z (TIMESTAMP_MAX_VALUE) to earlier time 294247-01-09T04:00:54.775Z
(end of global window) for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@71e075dc
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:120)
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:157)
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:104)
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:610)
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
 
      org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
 
      org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
 
      org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
 
      org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
 
      org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
 
      org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
 
      org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
 
      org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
 
      org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
 
      org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
 
      org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
 
      org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445)
 
      org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
 
      org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120)
 
      org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
 
      java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   
    java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
     
  java.base/java.lang.Thread.run(Thread.java:834)

Imported from Jira BEAM-14544. Original Jira may contain additional context. Reported by: bnemeth.

damccorm avatar Jun 05 '22 01:06 damccorm

I am experiencing the exact same issue when using method(STORAGE_WRITE_API), has any resolution been reached? I notice this bug when using Apache Beam version 2.37.0.

joey-berman avatar Aug 01 '22 15:08 joey-berman

@joey-berman I'm on 2.40.0 now and did not face this issue recently, but I have no idea if it's just coincidence, or it's a legit fix.

nbali avatar Aug 02 '22 06:08 nbali

@nbali When moving to 2.40.0 did you happen to notice a different timestamp issue, namely where the timestamp associated to data for an unbounded PCollection is greater than MAX_TIMESTAMP_VALUE and thus is flagged as an error with the following message: Error message from worker: java.lang.IllegalArgumentException: Cannot output with timestamp 294247-01-10T04:00:54.776Z. Output timestamps must be no earlier than the timestamp of the current input or timer (294247-01-10T04:00:54.776Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew. org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:259) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:85) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.output(SimpleDoFnRunner.java:843) org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.finalizeStream(StorageApiWritesShardedRecords.java:536) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.onTimer(StorageApiWritesShardedRecords.java:550)

joey-berman avatar Aug 10 '22 02:08 joey-berman

@joey-berman I did not, but I have non-zero getAllowedTimestampSkew on my DoFns, so if I get the message right this can't happen to me anyway.

nbali avatar Sep 09 '22 16:09 nbali

Previously, moving a timestamp backwards could result in dropped data. Today I think we have simplified the model so this may be safe in many situations. It would be potentially interesting design work to figure out the new constraints, if anyone is interested.

kennknowles avatar Dec 01 '22 22:12 kennknowles

But to be clear, drain itself is outside of Beam.

kennknowles avatar Dec 01 '22 22:12 kennknowles