flink-connector-kafka
flink-connector-kafka copied to clipboard
[FLINK-33466][Connectors/Kafka]: Bounded Kafka source never finishes after restore from savepoint
This fix snapshots the noMoreNewPartitionSplits variable. This is done to make sure that the signalNoMoreSplits signal is sent after restoring from a snapshot.
At the current state, a bounded kafka source will never finish after restoring from a snapshot state, because a NoMoreSplits signal is never sent to the readers.
I'm aware that I have not yet been assigned to a corresponding issue, but I am yet to hear anything from the community (I have already tried submitting an issue and raising the problem here). So, please let me know if I should go another way, or if you disagree with the approach - then we can work out something else together :)
I have created a test case using the testcontainers framework and the Flink MiniCluster that showcases the problem - the test runs forever before the fix, but successfully finishes after the fix:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import java.time.Duration;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import com.tryg.reconciler.util.KafkaContainer;
import static java.util.Map.entry;
public class BoundedKafkaSavepointIntegrationTests {
private static final int NUM_PARTITIONS = 1;
private static final Random random = new Random(StepDefinitions.class.hashCode());
StreamExecutionEnvironment env;
KafkaContainer kafkaContainer;
Properties kafkaProperties;
Configuration checkpointConfig;
String lastCheckpointPath;
String checkpointFolder;
String inputTopicName;
String outputTopicName;
List<String> inputTopic;
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build());
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Before
public void setup() throws Exception {
// Setup checkpoints
folder.create();
checkpointFolder = folder.newFolder().getAbsolutePath();
// Create StreamExecutionEnvironment
Configuration conf = new Configuration();
conf.setString("state.checkpoints.dir", "file://" + checkpointFolder);
env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// configure test environment
env.setParallelism(NUM_PARTITIONS);
// Start kafka container
kafkaContainer = new KafkaContainer(null);
kafkaContainer.start();
// Create topics
inputTopicName = "input-topic-" + random.nextInt(Integer.MAX_VALUE);
outputTopicName = "output-topic-" + random.nextInt(Integer.MAX_VALUE);
kafkaContainer.createTopics(NUM_PARTITIONS, inputTopicName, outputTopicName);
// Create kafka properties
kafkaProperties = new Properties();
kafkaProperties.putAll(Map.ofEntries(
entry(ConsumerConfig.GROUP_ID_CONFIG, "GROUP-ID-" + random.nextInt(Integer.MAX_VALUE)),
entry("bootstrap.servers", kafkaContainer.getBootstrapServers()),
entry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
));
checkpointConfig = new Configuration();
}
@After
public void cleanUp() {
kafkaContainer.stop();
folder.delete();
}
@Test
public void tests() throws Exception {
inputTopic = List.of("1", "2", "3");
produceDataToInputTopic(inputTopic);
process(true, false);
produceDataToInputTopic(List.of("4", "5", "6", "7", "8"));
process(false, true);
}
private void produceDataToInputTopic(List<String> msgs) {
KafkaRecordSerializationSchema<String> serializationSchema =
KafkaRecordSerializationSchema.builder()
.setTopic(inputTopicName)
.setValueSerializationSchema(new SimpleStringSchema())
.build();
List<ProducerRecord<byte[], byte[]>> records = msgs.stream()
.map(m -> serializationSchema.serialize(m, null, null))
.collect(Collectors.toList());
pushToKafka(records);
}
private void pushToKafka(List<ProducerRecord<byte[], byte[]>> records) {
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer())) {
records.forEach(record -> {
producer.send(record);
});
producer.flush();
}
}
private void process(boolean createCheckpoint, boolean fromCheckpoint) throws Exception {
env.getCheckpointConfig().configure(checkpointConfig);
TopicPartition tp = new TopicPartition(inputTopicName, 0);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBounded(OffsetsInitializer.offsets(Map.of(tp, (long) inputTopic.size() + 1)))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(new SimpleStringSchema()))
.setTopics(inputTopicName)
.setProperties(kafkaProperties)
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), inputTopicName)
.name(inputTopicName)
.uid(inputTopicName);
stream
.keyBy(ignored -> "sameKeyAlways")
.addSink(new CollectSink());
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
if (fromCheckpoint) {
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(lastCheckpointPath));
System.out.print("Resuming from savepoint " + lastCheckpointPath + "\n");
System.out.flush();
}
// Submit job
flinkCluster.before();
ClusterClient<?> client = flinkCluster.getClusterClient();
JobID jobId = client.submitJob(jobGraph).get();
while (true) {
JobStatus status = client.getJobStatus(jobId).get();
System.out.print("Status: " + status + ", all sunked values: " + CollectSink.values + "\n");
System.out.flush();
if (createCheckpoint && status.equals(JobStatus.RUNNING)) {
lastCheckpointPath = client.stopWithSavepoint(jobId, false, checkpointFolder, SavepointFormatType.CANONICAL).get();
System.out.print("Stopping with savepoint\n");
System.out.flush();
break;
}
if (status.equals(JobStatus.FINISHED)) {
break;
}
Thread.sleep(5000);
}
}
private static class CollectSink implements SinkFunction<String> {
public static final List<String> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(String value, SinkFunction.Context context) throws Exception {
values.add(value);
}
}
}
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
@jwtryg Can you please rebase your PR to trigger CI?
@mas-chen Do you also want to take a look at this one?