DataflowTemplates icon indicating copy to clipboard operation
DataflowTemplates copied to clipboard

[Bug]: TextToPubsubStreamLT and TextToBigQueryStreamLT don't drain

Open elizaveta-lomteva opened this issue 1 year ago • 0 comments

Related Template(s)

TextToPubsubStreamLT, TextToBigQueryStreamLT

What happened?

Context

After 44aa740552697517e89853e5cc33c2c39beb252c commit TextToPubsubLT and TextToBigQueryLT failed.

Issue

As soon as the test finishes the Dataflow job starts draining. But it drains infinitely until it's canceled. Example of such a job: TextToPubsubLT Dataflow job

Сause

According to the Dataflow documentation pipelines that include calls to Splittable DoFn won't drain. In TextToPubsubStream and TextToBigQueryStreaming templates used TextIO.read().watchForNewFiles() Splittable DoFn

Solution

Replace the waitForConditionsAndFinish method that calls job draining with waitForConditionsAndCancel that cancels the job.

Beam Version

Newer than 2.46.0

Relevant log output

2023-06-10 11:56:08.244 GET
Drain request is committed for workflow job: 2023-06-10_00_51_41-2277746579702030254.
2023-06-10 16:00:39.683 GET
Cancel request is committed for workflow job: 2023-06-10_00_51_41-2277746579702030254.
2023-06-10 16:00:39.736 GET
Finished operation Read Text Data/Create filepattern/Create.Values/Read(CreateSource)/Read(CreateSource)/Read(BoundedToUnboundedSourceAdapter)/DataflowRunner.StreamingUnboundedRead.ReadWithIds+Read Text Data/Create filepattern/Create.Values/Read(CreateSource)/Read(CreateSource)/Read(BoundedToUnboundedSourceAdapter)/StripIds+Read Text Data/Create filepattern/MapElements/Map+Read Text Data/Match All/Watch.Growth/ParDo(WatchGrowth)/ParMultiDo(WatchGrowth)/Pair with initial restriction+Read Text Data/Match All/Watch.Growth/ParDo(WatchGrowth)/ParMultiDo(WatchGrowth)/Split restriction+Read Text Data/Match All/Watch.Growth/ParDo(WatchGrowth)/ParMultiDo(WatchGrowth)/Explode windows+Read Text Data/Match All/Watch.Growth/ParDo(WatchGrowth)/ParMultiDo(WatchGrowth)/Assign unique key/AddKeys/Map+s8/GroupByKeyRaw/WriteStream
2023-06-10 16:00:39.763 GET
Finished operation Read Text Data/Match All/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/ReadStream+Read Text Data/Match All/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/MergeBuckets+Read Text Data/Match All/Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable+Read Text Data/Match All/Reshuffle.ViaRandomKey/Values/Values/Map+Read Text Data/Read Matches/ParDo(ToReadableFile)+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Split into ranges+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Pair with random key+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Reshuffle/Window.Into()/Window.Assign+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Reshuffle/GroupByKey/WriteStream
2023-06-10 16:00:39.763 GET
Finished operation s8/GroupByKeyRaw/ReadStream+s8/SplittableProcess+Read Text Data/Match All/Watch.Growth/ParDo(PollResultSplit)/ParMultiDo(PollResultSplit)/Pair with initial restriction+Read Text Data/Match All/Watch.Growth/ParDo(PollResultSplit)/ParMultiDo(PollResultSplit)/Split restriction+Read Text Data/Match All/Watch.Growth/ParDo(PollResultSplit)/ParMultiDo(PollResultSplit)/Explode windows+Read Text Data/Match All/Watch.Growth/ParDo(PollResultSplit)/ParMultiDo(PollResultSplit)/Assign unique key/AddKeys/Map+s13/GroupByKeyRaw/WriteStream
2023-06-10 16:00:39.763 GET
Finished operation Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Reshuffle/GroupByKey/ReadStream+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Reshuffle/GroupByKey/MergeBuckets+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Reshuffle/ExpandIterable+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Reshuffle/Values/Values/Map+Read Text Data/Via ReadFiles/Read all via FileBasedSource/Read ranges+Write to PubSub/MapElements/Map+Write to PubSub/PubsubUnboundedSink
2023-06-10 16:00:39.781 GET
Finished operation s13/GroupByKeyRaw/ReadStream+s13/SplittableProcess+Read Text Data/Match All/Values/Values/Map+Read Text Data/Match All/Reshuffle.ViaRandomKey/Pair with random key+Read Text Data/Match All/Reshuffle.ViaRandomKey/Reshuffle/Window.Into()/Window.Assign+Read Text Data/Match All/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/WriteStream
2023-06-10 16:00:39.914 GET
Stopping worker pool...
2023-06-10 16:02:52.073 GET
Worker pool stopped.

elizaveta-lomteva avatar Jun 12 '23 17:06 elizaveta-lomteva