beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: Watermarks and Windowing Not Working with FlinkRunner and KinesisIO Read Transform

Open akashk99 opened this issue 1 year ago • 32 comments

What happened?

When there are idle subtasks in flink, they dont propagate watermarks to downstream operators and thus windowing function that are based on watermarks never get triggered. I can see that when setting parallelism exactly equal to the number of kinesis shards, the problem doesnt exists, however, if this number is different, then I see the flink UI showing no watermarks and my windows never get triggered.

I also have custom DoFns that output with timestamp before so in theory, that should be used as the watermark for windowing, however, this is not the case.

When using native flink, I have seen solutions such as using methods like "withIdlenss", but these dont exist in beam. Is there something I am missing in my kinesis config or is this a known issue with the read transform,

This only occurs on the flink runner and not the direct or dataflow runner. Its also possible this isnt an issue with the kinesis io reader, but maybe the windowing function should ignore watermarks from idle upstream tasks.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [X] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Spark Runner
  • [X] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [ ] Component: Google Cloud Dataflow Runner

akashk99 avatar Apr 23 '24 18:04 akashk99

This is probably related to https://github.com/apache/beam/issues/29816 which was fixed in https://github.com/apache/beam/pull/30969. Can you verify this on 2.56.0?

je-ik avatar May 07 '24 08:05 je-ik

@je-ik Hi, thanks for the tip, but just tried upgrading to 2.56 and still seeing the same error. I am able to get it to work if I set my parallelism to 2 but any other values wont work which poses issues with autoscaling on aws. I also notice that the flink UI is showing me this:

No Watermark (Watermarks are only available if EventTime is used)

Im seeing this on the watermarks tab for each subtask. Any advice you could give would be very helpful

akashk99 avatar May 07 '24 16:05 akashk99

Can you provide all the command-line flags, which you pass to the runner, please?

je-ik avatar May 07 '24 17:05 je-ik

I am running it through aws managed flink so kind of a black box there, however, the only pipeline option I am passing is --runner=FlinkRunner in addition to my application specific options.

After reading the linked issue, I was able to get it to work locally using beam_fn_api experiments + upgrading to 2.56, but not really sure what thats doing.

I also noticed that this expirement is adding a bunch of operators and is resulting in higher backpressure and lower performance which means its most likely not a viable solution

akashk99 avatar May 07 '24 17:05 akashk99

Strange. Seems loke the fix is not working in your case. Can you double-check that you run with 2.56.0 (e.g. that no dependency brings some older Beam version, shading, etc.). Other than that it might help to set lpg level to DEBUG and investigate logs around FlinkSourceSplit and SplitEnumerator.

je-ik avatar May 07 '24 17:05 je-ik

will take a look to ensure no earlier version is being brought in. I was seeing this log:

May 07, 2024 9:51:04 AM org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase notifyNoMoreSplits INFO: Received NoMoreSplits signal from enumerator.

also just reran and saw this INFO: Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=org.apache.beam.sdk.io.aws2.kinesis.KinesisSource@6de059f3, splitState.isNull=true, checkpointMark=null}]

Does this give any indication into the issue?

akashk99 avatar May 07 '24 17:05 akashk99

Not really, but it seems you run the correct 2.56.0 version. The noMoreSplits signal just tells that there is indeed no more work. However that should result in emission of final watermark and should not hold the watermark. Could you patch your Beam version to add more logs? Ideally where the reader emits/computes watermark - e.g. https://github.com/apache/beam/blob/2ca9af8ecbcb3d0cf403bb4ce7d90a8a362b124d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java#L273

je-ik avatar May 07 '24 18:05 je-ik

I am actually seeing the watermarks work now in the flink runner on the web UI. And also seeing the idle tasks from my source reader get finished which I believe is ideal. However, I am still not getting the logs that occur when my window gets triggered unless beam_fn_api is enabled. Is there something else I need to be doing to get the window to trigger? This works without issue in dataflow and directrunner

akashk99 avatar May 07 '24 18:05 akashk99

I am actually seeing the watermarks work now in the flink runner on the web UI. And also seeing the idle tasks from my source reader get finished which I believe is ideal.

Yes, that is how the fix should work.

However, I am still not getting the logs that occur when my window gets triggered unless beam_fn_api is enabled. Is there something else I need to be doing to get the window to trigger?

Can you try setting autoWatermarkInterval?

je-ik avatar May 07 '24 18:05 je-ik

where is autoWatermarkInterval set in beam? Is this a pipeline option or set in the kinesis reader somewhere?

akashk99 avatar May 07 '24 18:05 akashk99

Pipeline option. E.g. --autoWatermarkInterval=100

je-ik avatar May 07 '24 19:05 je-ik

that worked, thank you so much for your help!

akashk99 avatar May 07 '24 19:05 akashk99

Hi, I'm facing a similar issue with Beam 2.56.0 and Flink 1.16.3 and Java SDK.

The problematic pipeline has parallelism 4 and has many slowly updating global window side inputs (uses unbounded GenerateSequence as in patterns) that are being updated every hour. A watermark is not being emitted (I can see No Watermark (Watermarks are only available if EventTime is used) in Flink UI in the following tasks) untill all source subtasks emit a message. This is a major issue, since it's not feasible to generate impulses more frequently and the pipeline is not able to make any progress.

Another source is Kafka and input topics have 3 partitions. I can see that one of the subtasks become FINISHED after some time and only 3 subtasks are active. Some of them receive very few messages and it's a norm for some partitions not to receive data for some time, this is especially true for testing environment.

I've tryied to set --autoWatermarkInterval=100, but it did not have any effect. There are no other Beam-specific properties set.

Any ideas how to fix this? Should I downgrade Beam version to workaround the problem?

yelianevich avatar May 16 '24 13:05 yelianevich

After all side inputs emit their first message then watermarks are emitted correctly?

je-ik avatar May 16 '24 14:05 je-ik

@je-ik thanks for a quick response!

What I ment is that only when all subtasks of the source emit a record (see 'Records sent' in the image below) then I can see watermark on the next operator in the UI. Each side input is independent in this respect. image

Below you can see an example of the source that do not emit a watermark, since only 2 subtasks emitted a record. image

yelianevich avatar May 16 '24 14:05 yelianevich

Understood, this is probably unrelated to the issue reported here, can you please create another one? It would be best if you could provide a simple pipeline that exhibits the behavior you observe.

je-ik avatar May 16 '24 14:05 je-ik

Thanks, I'll create a separate issue.

What do you think about the issue with KafkaIO?

The topic has 3 partitions. As you can see from the screenshot below it received just one record in one of the partitions. A watermark was not propagated in this case further.

I've tryied to set --autoWatermarkInterval=100, but it did not have any effect. There are no other Beam-specific properties set. image

yelianevich avatar May 16 '24 14:05 yelianevich

No, I don't think it is related to KafkaIO, more likely some subtlety related to refactoring of Flink runner sources, see https://github.com/apache/beam/pull/25525

je-ik avatar May 16 '24 14:05 je-ik

@je-ik Do you have any suggestions how to workaround the issue when there are several idle Kafka partitions/topics in the topology that hold an overall progress? It used to work with Beam 2.45.0 and Flink 1.15.0, but it seems that behavior has changed since then and it does not work as expected in Beam 2.56.0 and Flink 1.16.3.

yelianevich avatar May 17 '24 11:05 yelianevich

I don't know if there are any workarounds, as the described behavior seems to be (unknown) bug. It needs further investigation. Could you please provide a simplified pipeline that is affected by this?

je-ik avatar May 17 '24 11:05 je-ik

I am experiencing what i believe is quite the same issue, using FixedWindow when running in FlinkRunner.

I am also using processing time, and from checking tracing logs i can tell there's something wrong with the watermarks: WatermarkHold.addHolds: element hold at 2024-05-19T07:52:59.999Z is on time for key:aaa-bbb-ccc; window:[2024-05-19T07:52:00.000Z..2024-05-19T07:53:00.000Z); inputWatermark:-290308-12-21T19:59:05.225Z; outputWatermark:-290308-12-21T19:59:05.225Z

yardenbm avatar May 21 '24 09:05 yardenbm

@yardenbm do you have idle partitions, or do all partitions contain data at all times?

je-ik avatar May 21 '24 11:05 je-ik

I tested various versions of Apache Beam with Apache Flink 1.15.4 and the issue started to appear in 2.52.0. Apache Beam 2.46.0 - 2.51.0 does not have this issue with Flink 1.15.4. Hopefully, it will help. Meanwhile i'll try to create a minimal example that reproduces the problem.

yelianevich avatar May 21 '24 13:05 yelianevich

I don't know if there are any workarounds, as the described behavior seems to be (unknown) bug. It needs further investigation. Could you please provide a simplified pipeline that is affected by this?

I could easily reproduce it locally with a test container of Confluent Kafka 7.6.0 and Flink 1.15.4. I run this test with Beam 2.51.0 (the last version that work) and 2.56.0 (always fails).

Here is the test case, see the comments in the code.

    @Test
    void testBeamFromKafkaSourcesIssue() throws Exception {
        // this topic receives data
        String topicFull = "topic-in-1";

        // this topic is empty - the main ingredient to reproduce the issue
        // if I remove it from the input - it works on 2.56.0
        String topicEmpty = "topic-in-2";

        try (AdminClient adminClient = KafkaAdminClient.create(kafkaProperties.buildAdminProperties())) {
            adminClient
                    .createTopics(List.of(
                            new NewTopic(topicFull, 3, (short) 1),
                            new NewTopic(topicEmpty, 3, (short) 1)
                    ))
                    .all()
                    .get(5, TimeUnit.SECONDS);
        }

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProperties.buildProducerProperties())) {
            producer.send(new ProducerRecord<>(topicFull, 0, null, "payload-0")).get();
            producer.send(new ProducerRecord<>(topicFull, 1, null, "payload-11")).get();
            producer.send(new ProducerRecord<>(topicFull, 2, null, "payload-222")).get();
            producer.send(new ProducerRecord<>(topicFull, 0, null, "payload-0")).get();
        }

        PipelineOptions opts = PipelineOptionsFactory.create();
        opts.setRunner(TestFlinkRunner.class);

        Pipeline pipeline = Pipeline.create(opts);

        String bootstrapServers = String.join(",", kafkaProperties.getBootstrapServers());

        PCollection<KafkaRecord<String, String>> readFullTopic = pipeline
                .apply("ReadTopic1", createReader(topicFull, bootstrapServers));
        PCollection<KafkaRecord<String, String>> readEmptyTopic = pipeline
                .apply("ReadTopic2", createReader(topicEmpty, bootstrapServers));

        PCollectionList.of(List.of(readFullTopic, readEmptyTopic))
                .apply("Flatten", Flatten.pCollections())
                .apply("ToString", MapElements.into(strings()).via(r -> r.getKV().getValue()))
                .apply("LogInput", ParDo.of(LogContext.of("Input")))
                .apply("Window", Window.into(FixedWindows.of(Duration.standardSeconds(3))))
                .apply("Count", Count.perElement())
                .apply("LogOutput", ParDo.of(LogContext.of("Counts")));

        pipeline.run();
    }

    private static KafkaIO.Read<String, String> createReader(String kafkaTopic, String bootstrapServers) {
        return KafkaIO.<String, String>read()
                .withBootstrapServers(bootstrapServers)
                .withTopic(kafkaTopic)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withConsumerConfigUpdates(Map.of(
                        AUTO_OFFSET_RESET_CONFIG, "earliest"
                ));
    }

    @AllArgsConstructor(staticName = "of")
    static class LogContext<T> extends DoFn<T, T> {
        private final String prefix;

        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.printf("%s: Element: %s, pane: %s, ts: %s%n", prefix, c.element(), c.pane(), c.timestamp());
            c.output(c.element());
        }
    }

Output from LogContext 2.56.0 (never fires a window, never outputs counts)

Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts: 2024-05-23T09:31:08.674Z
Input: Element: payload-11, pane: PaneInfo.NO_FIRING, ts: 2024-05-23T09:31:08.695Z
Input: Element: payload-222, pane: PaneInfo.NO_FIRING, ts: 2024-05-23T09:31:08.696Z
Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts: 2024-05-23T09:31:08.696Z

2.51.0 (expected)

Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts: 2024-05-23T09:26:43.119Z
Input: Element: payload-11, pane: PaneInfo.NO_FIRING, ts: 2024-05-23T09:26:43.140Z
Input: Element: payload-222, pane: PaneInfo.NO_FIRING, ts: 2024-05-23T09:26:43.141Z
Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts: 2024-05-23T09:26:43.141Z
Counts: Element: KV{payload-0, 2}, pane: PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}, ts: 2024-05-23T09:26:44.999Z
Counts: Element: KV{payload-222, 1}, pane: PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}, ts: 2024-05-23T09:26:44.999Z
Counts: Element: KV{payload-11, 1}, pane: PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}, ts: 2024-05-23T09:26:44.999Z

yelianevich avatar May 23 '24 09:05 yelianevich

Great, thanks! Looks like empty sources (before emitting first element) do not update downstream watermark. This is consistent with the observation above. I will look into it.

je-ik avatar May 23 '24 12:05 je-ik

@yelianevich Is this with default flags? Does this change when using --experiments=use_deprecated_read or --experiments=beam_fn_api?

je-ik avatar May 23 '24 13:05 je-ik

@je-ik I do not specify any additional flags in the test, so I think it uses defaults.

I run the same test using --experiments=use_deprecated_read and it didn't change anything.

The test with --experiments=beam_fn_api started to produce aggregates.

PipelineOptions opts = PipelineOptionsFactory.fromArgs(new String[]{"--experiments=beam_fn_api"}).create();

yelianevich avatar May 23 '24 14:05 yelianevich

Yeah. Thia narrows it down to the source API. I was not able to identify a comnit to 2.52.0 that could cause this, but I'll use your test locally. Thanks. 👍

je-ik avatar May 23 '24 15:05 je-ik

https://github.com/apache/beam/pull/31391

je-ik avatar May 24 '24 09:05 je-ik

@yelianevich @yardenbm Hi, can you please try applying #31391 and verify it fixes the issue?

je-ik avatar May 24 '24 09:05 je-ik