[Bug]: KinesisIO source on FlinkRunner initializes the same splits twice
What happened?
Bug description
Setup details:
- FlinkRunner (Flink 1.15.4) (also replicated with Flink 1.18.2)
- KinesisIO (from
beam-sdks-java-io-amazon-web-services2) - Beam version 2.56.0
- Pipeline attached mode (false)
Bug details:
- When restoring from snapshot on Flink, the
org.apache.beam.sdk.io.aws2.kinesis.KinesisReaderis assigned the same splits twice, once with snapshot state, and once without. This leads to duplicate data being processed.
Replication steps:
- Start a Flink job with KinesisIO source.
- Stop the Flink job with a savepoint.
- Start the same Flink job from savepoint.
Logs:
- From Flink Taskmanager (worker node) log dump below, we can see that splits for
shardId-000000000000toshardId-000000000003are first initialized with checkpoint stateAFTER_SEQUENCE_NUMBER(correct). - Following that, we see that they are initialized without checkpoint state with
AT_TIMESTAMP(not correct).
2024-05-16 12:29:43,263 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=org.apache.beam.sdk.io.aws2.kinesis.KinesisSource@269f1286, splitState.isNull=false, checkpointMark=null}]
2024-05-16 12:29:43,264 WARN org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSourceReader [] - AutoWatermarkInterval is not set, watermarks will be emitted at a default interval of 200 ms
2024-05-16 12:29:43,264 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: KDS Source/Read(KinesisSource) -> Flat Map -> ParDo(Anonymous)/ParMultiDo(Anonymous) -> KDS Sink/ParDo(Anonymous)/ParMultiDo(Anonymous) (1/1)#0 (9e813ffa491b6a4d44e7860742f1576b) switched from INITIALIZING to RUNNING.
2024-05-16 12:29:43,265 WARN org.apache.beam.sdk.coders.SerializableCoder [] - Can't verify serialized elements of type KinesisReaderCheckpoint have well defined equals method. This may produce incorrect results on some PipelineRunner implementations2024-05-16 12:29:43,265 INFO org.apache.beam.sdk.io.aws2.kinesis.KinesisSource [] - Got checkpoint mark [Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream,shard shardId-000000000000: 49651326501327272958725198009666143092350196858539212802 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000001: 49651326500635949857570748692274909048439253764329701394 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000002: 49651326503155934065004709107267236287428903985330782242 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000003: 49651326499966927501614829993237864477127027156192329778 0]2024-05-16 12:29:43,266 INFO org.apache.beam.sdk.io.aws2.kinesis.KinesisSource [] - Creating new reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000000: 49651326501327272958725198009666143092350196858539212802 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000001: 49651326500635949857570748692274909048439253764329701394 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000002: 49651326503155934065004709107267236287428903985330782242 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000003: 49651326499966927501614829993237864477127027156192329778 0]
2024-05-16 12:29:43,268 INFO org.apache.beam.sdk.io.aws2.kinesis.KinesisReader [] - Starting reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000000: 49651326501327272958725198009666143092350196858539212802 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000001: 49651326500635949857570748692274909048439253764329701394 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000002: 49651326503155934065004709107267236287428903985330782242 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000003: 496513264999669275016148299932378644771270271561923297780]
2024-05-16 12:29:43,272 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000000, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326501327272958725198009666143092350196858539212802)
2024-05-16 12:29:43,795 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000001, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326500635949857570748692274909048439253764329701394)
2024-05-16 12:29:43,891 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000002, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326503155934065004709107267236287428903985330782242)
2024-05-16 12:29:43,983 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000003, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326499966927501614829993237864477127027156192329778)
2024-05-16 12:29:44,080 INFO org.apache.beam.sdk.io.aws2.kinesis.ShardReadersPool [] - Starting to read ExampleInputStream stream from [shardId-000000000000, shardId-000000000001, shardId-000000000002, shardId-000000000003] shards
2024-05-16 12:29:44,266 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=org.apache.beam.sdk.io.aws2.kinesis.KinesisSource@2a249887, splitState.isNull=true, checkpointMark=null}]
2024-05-16 12:29:44,266 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Received NoMoreSplits signal from enumerator.
2024-05-16 12:29:44,898 INFO org.apache.beam.sdk.io.aws2.kinesis.KinesisSource [] - No checkpointMark specified, fall back to initial [Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000000: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000001: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000002: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000003: null null]
2024-05-16 12:29:44,898 INFO org.apache.beam.sdk.io.aws2.kinesis.KinesisSource [] - Creating new reader using [Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000000: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000001: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000002: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000003: null null]
2024-05-16 12:29:44,899 INFO org.apache.beam.sdk.io.aws2.kinesis.KinesisReader [] - Starting reader using [Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000000: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000001: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream,shard shardId-000000000002: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000003: null null]
2024-05-16 12:29:44,899 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000000, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,311 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000001, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,412 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000002, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,514 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000003, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,620 INFO org.apache.beam.sdk.io.aws2.kinesis.ShardReadersPool [] - Starting to read ExampleInputStream stream from [shardId-000000000000, shardId-000000000001, shardId-000000000002, shardId-000000000003] shards
Issue Priority
Priority: 3 (minor)
Issue Components
- [ ] Component: Python SDK
- [x] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [x] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
Adding dump of replication Flink code here:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink.example</groupId>
<artifactId>beam</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>Apache Flink Beam Application</name>
<packaging>jar</packaging>
<properties>
<flink.version>1.15.2</flink.version>
<logback.version>1.4.14</logback.version>
<main-class>org.apache.flink.example.BeamApplication</main-class>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.15</artifactId>
<version>2.56.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
<version>2.56.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-core</exclude>
<exclude>org.apache.flink:flink-annotations</exclude>
<exclude>org.apache.flink:flink-metrics-core</exclude>
<exclude>org.apache.flink:flink-shaded-*</exclude>
<exclude>org.apache.flink:flink-table-*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${main-class}</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Java class
package org.apache.flink.example;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import lombok.Data;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.joda.time.Duration;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
public class BeamApplication {
@Data
@JsonPropertyOrder({"timestamp", "location", "quantity"})
public static final class Event {
private Instant timestamp;
private String location;
private long quantity;
}
@WithSpan
public static void main(final String... args) throws Exception {
FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setAttachedMode(false);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("KDS Source", KinesisIO.read()
.withStreamName("ExampleInputStream")
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
.withInitialTimestampInStream(org.joda.time.Instant.now().minus(Duration.standardMinutes(30)))
.withClientConfiguration(ClientConfiguration.builder()
.region(Region.US_EAST_1)
.build()))
.apply(ParDo.of(new DoFn<KinesisRecord, String>() {
@ProcessElement
public void processElement(@Element KinesisRecord record, OutputReceiver<String> out) {
System.out.println(record.toString());
out.output(record.toString());
}
}))
.apply("KDS Sink", KinesisIO.<String>write()
.withStreamName("ExampleOutputStream")
.withClientConfiguration(ClientConfiguration.builder()
.region(Region.US_EAST_1)
.build())
.withSerializer((SerializableFunction<String, byte[]>) input -> input.getBytes(StandardCharsets.UTF_8))
.withPartitioner(KinesisPartitioner.explicitRandomPartitioner(1))
);
pipeline.run();
}
}
@je-ik is this the same issue as this https://github.com/apache/beam/issues/30903
I noticed you fixed it and the problem statement seems to be similar, but please let me know if this is something different as I am getting duplicated data on 2.56 when restoring from a flink save point
I suppose this is (similar, but) different issue, probably caused by the same underlying bug. #30903 fixed Impulse only. Does using --experiments=beam_fn_api fix the issue?
@je-ik Hi, yes that did fix the issue. thank you! For my understanding, what does this option do exactly? And should I expect any performance degradation?
I am noticing actually a lot of back pressure using this approach despite downstream operators having low CPU usage. Is the fix to the root cause relatively straight forward in which case I can implement it in a forked version of the repo? or is it more involved?
I don't know the root cause, it seems that Flink does not send the snapshot state after restore from savepoint. I observed this on the Impulse (I suspected that it affects only bounded sources running in unbounded mode, but it seems it is not the case). It might be a Beam bug or a Flink bug.
Hi, yes that did fix the issue. thank you! For my understanding, what does this option do exactly? And should I expect any performance degradation?
The flag turns on different expansion for Read transform - it uses splittable DoFn (SDF), which uses Impulse which was fixed earlier. Performance should be similar to classical Read.
Im surprised this is a bug considering restoring from a flink savepoint is a pretty common use case, is it possible there some configuration missing somewhere? I havent been able to find anyone else online experiencing this same issue but I was able to replicate it using both kinesis and kafka. Given how common of a use case it is, Im not 100% sure I believe this is in fact a bug and most likely some user error on my part.
I can make do without savepoints by utilizing kafka offset commits and consumer groups to ensure no data is lost, but cant figure out a way to not lose data that is windowed but not triggered when the flink application is stopped. Maybe you know of a solution to that problem?
it seems like a lot of the subtasks arent being utilized when stripping IDs with beam_fn_api despite the number of shards being 20 and parallelism being 24 (in theory should only be 4 idle subtasks)
Im surprised this is a bug considering restoring from a flink savepoint is a pretty common use case, is it possible there some configuration missing somewhere? I havent been able to find anyone else online experiencing this same issue but I was able to replicate it using both kinesis and kafka. Given how common of a use case it is, Im not 100% sure I believe this is in fact a bug and most likely some user error on my part.
Can you please provide a minimal example and setup to reproduce the behavior?
I can make do without savepoints by utilizing kafka offset commits and consumer groups to ensure no data is lost, but cant figure out a way to not lose data that is windowed but not triggered when the flink application is stopped. Maybe you know of a solution to that problem?
You can drain the Pipeline, see https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#terminating-a-job
it seems like a lot of the subtasks arent being utilized when stripping IDs with beam_fn_api despite the number of shards being 20 and parallelism being 24 (in theory should only be 4 idle subtasks)
This is related to how Flink computes target splits. It is affected by maximal parallelism (which is computed automatically, if not specified). You can try increasing it via --maxParallelism=32768 (32768 is maximal value), this could make the assignment more balanced.
Thanks for the suggestions, will give them a try. I believe the first comment of the ticket provides a simple pipeline that exhibits this behavior on the flink runner but if that doesn’t work, happy to provide another. The example also submits the job in detached mode which may be related, although have seen similar behavior without it. Appreciate your help looking into this, if there’s anything I can assist with, please let me know
Just to mimic the local setup I used:
I ran flink/start-cluster.sh
used the flink run command with the -d flag
and then stopped the job with a savepoint ./flink/bin/flink stop -p flink/savepoints cf78a44e6b10ab7062d3c02bb7d4e052
and then restarted using run with the savepoint path.
When doing this, I looked inside the task manager logs and searched for Starting getIterator request and saw 6 logs for the same timestamp that my app restarted. 3 at sequence number and 3 at latest. I am not sure why the latest ones are showing up and didnt see anything in the source code that would cause this.
I also switched to kafka and noticed the same behavior so it seems to be related to the runner. I was unable to fix the performance issues with beam_fn_api and notice the backpressure was causing my data to come in waves. Looking at a cpu chart, it was very cyclic with peaks of 99% cpu and troughs of 8% cpu leading me to believe that this pipeline option was causing some sort of build up and then a rush of data causing the cpu to spike.
I can make do with kafka offset commits for now, but if there are any pointers on how to fix this in the beam source code, id be happy to take a look and even submit a PR to be included in version 2.57. Although still hoping the issue is somewhere on my end that can be fixed fairly easily
Hi @akashk99, just to be sure, do you observe the same behavior when not using flink run,, but running the job as "standard" Java app (java -cp <jar> class) and passing the Flink configuration using Beam command-line args (--runner=flink --flinkMaster=... --savepointPath=...)?
Hi @je-ik , was just able to reproduce the issue by manually running the jar file.
Started the job by running java -cp <jar> <class> --runner=flink --flinkMaster=... and then used flink stop with savepoint to take a savepoint. afterwards, I reran, java -cp <jar> <class> --runner=flink --flinkMaster=... --savepointPath=... with the snapshot I just took. After observing the task manager logs, I see:
Terminal Log:
Jun 06, 2024 12:55:08 PM org.apache.flink.client.program.rest.RestClusterClient lambda$null$6
INFO: Successfully submitted job <jobName> (6651a9570e4c9d1df81539b07e6e91ce) to 'http://localhost:8081'.
Task Manager Logs:
2024-06-06 12:55:11,979 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=<streamName>, ShardId=shardId-000000000012, ShardIteratorType=AT_SEQUENCE_NUMBER,
2024-06-06 12:55:13,279 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=<streamName>, ShardId=shardId-000000000012, ShardIteratorType=LATEST)
this was a few seconds after the job was submitted. I trimmed the output, but these two logs were there for all of my shards.
Hi, we are seeing the same behavior on our pipeline.
Logs from a Task Manager
2024-07-08 14:29:13,867 WARN org.apache.beam.sdk.coders.SerializableCoder [] - Can't verify serialized elements of type KinesisReaderCheckpoint have well defined equals method. This may produce incorrect results on some PipelineRunner implementations
2024-07-08 14:29:13,867 INFO org.apache.beam.sdk.io.kinesis.KinesisSource [] - Creating new reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream <streamName>, shard shardId-000000002111: 49653574367537750625153914864609254626539224692753990642]
2024-07-08 14:29:13,945 INFO org.apache.beam.sdk.io.kinesis.KinesisReader [] - Starting reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream <streamName>, shard shardId-000000002111: 49653574367537750625153914864609254626539224692753990642]
2024-07-08 14:29:14,122 INFO org.apache.beam.sdk.io.kinesis.ShardReadersPool [] - Starting to read <streamName> stream from [shardId-000000002111] shards
2024-07-08 14:29:14,772 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Adding splits [FlinkSourceSplit{splitIndex=2, beamSource=org.apache.beam.sdk.io.kinesis.KinesisSource@a09dd60, splitState.isNull=true, checkpointMark=null}]
2024-07-08 14:29:14,773 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Received NoMoreSplits signal from enumerator.
2024-07-08 14:29:14,775 INFO org.apache.beam.sdk.io.kinesis.KinesisSource [] - Creating new reader using [Checkpoint LATEST for stream <streamName>, shard shardId-000000002111: null]
2024-07-08 14:29:14,778 INFO org.apache.beam.sdk.io.kinesis.KinesisReader [] - Starting reader using [Checkpoint LATEST for stream <streamName>, shard shardId-000000002111: null]
2024-07-08 14:29:14,830 INFO org.apache.beam.sdk.io.kinesis.ShardReadersPool [] - Starting to read <streamName> stream from [shardId-000000002111] shards
In our case we are using:
- FlinkRunner 1.17.2
- Beam 2.56.0
- KinesisIO (the deprecated one from
beam-sdks-java-io-kinesis)
We met the same issue, is there any workaround available?
FlinkRunner 1.18
Beam 2.57.0
KafkaIO
"Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=[org.apache.beam.sdk.io](http://org.apache.beam.sdk.io/).kafka.KafkaUnboundedSource@5420eacc, splitState.isNull=false, checkpointMark=null}]",
"Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=[org.apache.beam.sdk.io](http://org.apache.beam.sdk.io/).kafka.KafkaUnboundedSource@174df24, splitState.isNull=true, checkpointMark=null}]",
@weijiequ Do you use --experiments=use_deprecated_read? And does the behavior change if you add/remove it?
hi @je-ik , tried with --experiments=use_deprecated_read, the behavior is the same - duplicated Source.Reader are created.
With thread dump, we could observe multiple KafkaConsumerPoll-thread are running - reading from the same partition concurrently.
"KafkaConsumerPoll-thread" Id=153 RUNNABLE
"KafkaConsumerPoll-thread" Id=167 RUNNABLE
If run without savepoint, only one KafkaConsumerPoll-thread is running.
Taking further look into the stacktrace, the first time adding splits is at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase.addSplits(FlinkSourceReaderBase.java:198) org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:344) org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753) org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728) org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693) org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955) org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:924) org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748) org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) java.base/java.lang.Thread.run(Thread.java:829)
Then the normal code path of adding splits (the same as run without savepoint) will add a duplicate split again.
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase.addSplits(FlinkSourceReaderBase.java:198) org.apache.flink.streaming.api.operators.SourceOperator.handleAddSplitsEvent(SourceOperator.java:590) org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:567) org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80) ....
How about adding a duplicate check by split id here? https://github.com/apache/beam/blob/release-2.57.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java#L241
I just find a possibly related SO question: https://stackoverflow.com/questions/79088562/flink-job-processes-kafka-messages-twice-after-jobmanager-failover-in-ha-mode
Could verify if the behavior is the same in your case? I.e. killing TMs after savepoint inicializes splits only once (because they now start from checkpoint, not savepoint)?
Looks like very similar issue. We do normal shutdown with savepoint and then restore from savepoint. It causes the duplicate message issue (all new produced messages after restart will be consumed twice, existing messages are fine). Similar to the SO post, we had also verified that the expected number of tasks are up and running. We additionally took a thread dump of the subtask then we found two running Kakfa poll threads. It's not the Kafka offset commit issue, we checked the status of the consumers - the committed offset is up to date. All new produced messages to the topic will be consumed twice by the task. I didn't try to kill the TMs, it's possible that the issue could be gone by doing that as it's now restore from checkpoint.
@weijiequ I created PR that seems to fix the issue in my local setup. Can you apply the patch and verify it at your side, please?
https://github.com/apache/beam/pull/33606
@je-ik verified, it works, thank you!! When starting without savepoint, the splits are added with "Starting source". When starting with savepoint, this start is skipped thus no duplicate splits are added.
@je-ik After applying this change, I noticed a side-effect: when I scale out my job (for example, increasing the parallelism from 4 to 8) and then restart from a savepoint, the additional splits (indexes 4, 5, 6, 7) never start, while the original four splits (indexes 0, 1, 2, 3) continue running as expected.
Good catch! :+1:
The reason is that the splits are statically assigned to the workers after the initial split. Seems the implementation of addSplitsBack is wrong, I'll look into that.
Thanks @je-ik , once you have the updated patch, I can also verify on my local.
@weijiequ can you try setting --maxParallelism=32768, start and rescale the job again?
@je-ik Confirmed with --maxParallelism=32768 (set this before scale out), the additional new splits after scale out could up and run. So shall we apply this in our env or there will be a permanent fix for this as well?
You can actually use lower number. Something that can fit into maximal scale you can reach.
Got it, double confirm on the suggested solution at the moment
- applying the previous shared patch two days ago
- set --maxParallelism as a start option Am I right? @je-ik
Yes.