DataflowTemplates
DataflowTemplates copied to clipboard
[Bug]: TextToPubsubStreamLT and TextToBigQueryStreamLT don't drain
Related Template(s)
TextToPubsubStreamLT, TextToBigQueryStreamLT
What happened?
Context
After 44aa740552697517e89853e5cc33c2c39beb252c commit TextToPubsubLT and TextToBigQueryLT failed.
Issue
As soon as the test finishes the Dataflow job starts draining. But it drains infinitely until it's canceled. Example of such a job: TextToPubsubLT Dataflow job
Сause
According to the Dataflow documentation pipelines that include calls to Splittable DoFn won't drain.
In TextToPubsubStream and TextToBigQueryStreaming templates used TextIO.read().watchForNewFiles()
Splittable DoFn
Solution
Replace the waitForConditionsAndFinish
method that calls job draining with waitForConditionsAndCancel
that cancels the job.
Beam Version
Newer than 2.46.0
Relevant log output
2023-06-10 11:56:08.244 GET
Drain request is committed for workflow job: 2023-06-10_00_51_41-2277746579702030254.
2023-06-10 16:00:39.683 GET
Cancel request is committed for workflow job: 2023-06-10_00_51_41-2277746579702030254.
2023-06-10 16:00:39.736 GET
Finished operation Read Text Data/Create filepattern/Create.Values/Read(CreateSource)/Read(CreateSource)/Read(BoundedToUnboundedSourceAdapter)/DataflowRunner.StreamingUnboundedRead.ReadWithIds+Read Text Data/Create filepattern/Create.Values/Read(CreateSource)/Read(CreateSource)/Read(BoundedToUnboundedSourceAdapter)/StripIds+Read Text Data/Create filepattern/MapElements/Map+Read Text Data/Match All/Watch.Growth/ParDo(WatchGrowth)/ParMultiDo(WatchGrowth)/Pair with initial restriction+Read Text Data/Match All/Watch.Growth/ParDo(WatchGrowth)/ParMultiDo(WatchGrowth)/Split restriction+Read Text Data/Match All/Watch.Growth/ParDo(WatchGrowth)/ParMultiDo(WatchGrowth)/Explode windows+Read Text Data/Match All/Watch.Growth/ParDo(WatchGrowth)/ParMultiDo(WatchGrowth)/Assign unique key/AddKeys/Map+s8/GroupByKeyRaw/WriteStream
2023-06-10 16:00:39.763 GET
Finished operation Read Text Data/Match All/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/ReadStream+Read Text Data/Match All/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/MergeBuckets+Read Text Data/Match All/Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable+Read Text Data/Match All/Reshuffle.ViaRandomKey/Values/Values/Map+Read Text Data/Read Matches/ParDo(ToReadableFile)+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Split into ranges+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Pair with random key+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Reshuffle/Window.Into()/Window.Assign+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Reshuffle/GroupByKey/WriteStream
2023-06-10 16:00:39.763 GET
Finished operation s8/GroupByKeyRaw/ReadStream+s8/SplittableProcess+Read Text Data/Match All/Watch.Growth/ParDo(PollResultSplit)/ParMultiDo(PollResultSplit)/Pair with initial restriction+Read Text Data/Match All/Watch.Growth/ParDo(PollResultSplit)/ParMultiDo(PollResultSplit)/Split restriction+Read Text Data/Match All/Watch.Growth/ParDo(PollResultSplit)/ParMultiDo(PollResultSplit)/Explode windows+Read Text Data/Match All/Watch.Growth/ParDo(PollResultSplit)/ParMultiDo(PollResultSplit)/Assign unique key/AddKeys/Map+s13/GroupByKeyRaw/WriteStream
2023-06-10 16:00:39.763 GET
Finished operation Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Reshuffle/GroupByKey/ReadStream+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Reshuffle/GroupByKey/MergeBuckets+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Reshuffle/ExpandIterable+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Values/Values/Map+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Read ranges+Write to PubSub/MapElements/Map+Write to PubSub/PubsubUnboundedSink
2023-06-10 16:00:39.781 GET
Finished operation s13/GroupByKeyRaw/ReadStream+s13/SplittableProcess+Read Text Data/Match All/Values/Values/Map+Read Text Data/Match All/Reshuffle.ViaRandomKey/Pair with random key+Read Text Data/Match All/Reshuffle.ViaRandomKey/Reshuffle/Window.Into()/Window.Assign+Read Text Data/Match All/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/WriteStream
2023-06-10 16:00:39.914 GET
Stopping worker pool...
2023-06-10 16:02:52.073 GET
Worker pool stopped.