flink-connector-kafka icon indicating copy to clipboard operation
flink-connector-kafka copied to clipboard

[FLINK-33466][Connectors/Kafka]: Bounded Kafka source never finishes after restore from savepoint

Open jwtryg opened this issue 1 year ago • 3 comments

This fix snapshots the noMoreNewPartitionSplits variable. This is done to make sure that the signalNoMoreSplits signal is sent after restoring from a snapshot.

At the current state, a bounded kafka source will never finish after restoring from a snapshot state, because a NoMoreSplits signal is never sent to the readers.

I'm aware that I have not yet been assigned to a corresponding issue, but I am yet to hear anything from the community (I have already tried submitting an issue and raising the problem here). So, please let me know if I should go another way, or if you disagree with the approach - then we can work out something else together :)

I have created a test case using the testcontainers framework and the Flink MiniCluster that showcases the problem - the test runs forever before the fix, but successfully finishes after the fix:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import java.time.Duration;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import com.tryg.reconciler.util.KafkaContainer;
import static java.util.Map.entry;

public class BoundedKafkaSavepointIntegrationTests {
    private static final int NUM_PARTITIONS = 1;
    private static final Random random = new Random(StepDefinitions.class.hashCode());
    
    StreamExecutionEnvironment env;
    KafkaContainer kafkaContainer;
    Properties kafkaProperties;

    Configuration checkpointConfig;
    String lastCheckpointPath;
    String checkpointFolder;

    String inputTopicName;
    String outputTopicName;
    List<String> inputTopic;
    
    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
    new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
        .setNumberSlotsPerTaskManager(1)
        .setNumberTaskManagers(1)
        .build());
        
    @Rule 
    public TemporaryFolder folder = new TemporaryFolder();

    @Before
    public void setup() throws Exception {
        // Setup checkpoints
        folder.create();
        checkpointFolder = folder.newFolder().getAbsolutePath();

        // Create StreamExecutionEnvironment
        Configuration conf = new Configuration();
        conf.setString("state.checkpoints.dir", "file://" + checkpointFolder);
        env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // configure test environment
        env.setParallelism(NUM_PARTITIONS);

        // Start kafka container
        kafkaContainer = new KafkaContainer(null);
        kafkaContainer.start();

        // Create topics
        inputTopicName = "input-topic-" + random.nextInt(Integer.MAX_VALUE);
        outputTopicName = "output-topic-" + random.nextInt(Integer.MAX_VALUE);
        kafkaContainer.createTopics(NUM_PARTITIONS, inputTopicName, outputTopicName);
        
        // Create kafka properties
        kafkaProperties = new Properties();
            kafkaProperties.putAll(Map.ofEntries(
                entry(ConsumerConfig.GROUP_ID_CONFIG, "GROUP-ID-" + random.nextInt(Integer.MAX_VALUE)),
                entry("bootstrap.servers", kafkaContainer.getBootstrapServers()),
                entry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
            ));

        checkpointConfig = new Configuration();
    }

    @After
    public void cleanUp() {
        kafkaContainer.stop();
        folder.delete();
    }

    @Test
    public void tests() throws Exception {
        inputTopic = List.of("1", "2", "3");
        produceDataToInputTopic(inputTopic);
        process(true, false);
        produceDataToInputTopic(List.of("4", "5", "6", "7", "8"));
        process(false, true);
    }

    private void produceDataToInputTopic(List<String> msgs) {
        KafkaRecordSerializationSchema<String> serializationSchema = 
            KafkaRecordSerializationSchema.builder()
            .setTopic(inputTopicName)
            .setValueSerializationSchema(new SimpleStringSchema())
            .build();

        List<ProducerRecord<byte[], byte[]>> records = msgs.stream()
            .map(m -> serializationSchema.serialize(m, null, null))
            .collect(Collectors.toList());

        pushToKafka(records);
    }

    private void pushToKafka(List<ProducerRecord<byte[], byte[]>> records) {
        try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer())) {
            records.forEach(record -> {
                producer.send(record);
            });
            producer.flush();
        }
    }
    
    private void process(boolean createCheckpoint, boolean fromCheckpoint) throws Exception {
        env.getCheckpointConfig().configure(checkpointConfig);
        TopicPartition tp = new TopicPartition(inputTopicName, 0);

        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBounded(OffsetsInitializer.offsets(Map.of(tp, (long) inputTopic.size() + 1)))
            .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(new SimpleStringSchema()))
            .setTopics(inputTopicName)
            .setProperties(kafkaProperties)
            .build();

        DataStream<String> stream = env
            .fromSource(source, WatermarkStrategy.noWatermarks(), inputTopicName)
            .name(inputTopicName)
            .uid(inputTopicName);

        stream
            .keyBy(ignored -> "sameKeyAlways")
            .addSink(new CollectSink());

        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        if (fromCheckpoint) {
            jobGraph.setSavepointRestoreSettings(
                SavepointRestoreSettings.forPath(lastCheckpointPath));
                System.out.print("Resuming from savepoint " + lastCheckpointPath + "\n");
                System.out.flush();
        }

        // Submit job
        flinkCluster.before();
        ClusterClient<?> client = flinkCluster.getClusterClient();
        JobID jobId = client.submitJob(jobGraph).get();

        while (true) {
            JobStatus status = client.getJobStatus(jobId).get();
            System.out.print("Status: " + status + ", all sunked values: " + CollectSink.values + "\n");
            System.out.flush();

            if (createCheckpoint && status.equals(JobStatus.RUNNING)) {
                    lastCheckpointPath = client.stopWithSavepoint(jobId, false, checkpointFolder, SavepointFormatType.CANONICAL).get();
                    System.out.print("Stopping with savepoint\n");
                    System.out.flush();
                    break;
            } 
            
            if (status.equals(JobStatus.FINISHED)) {
                break;
            }

            Thread.sleep(5000);
        }
    }

    private static class CollectSink implements SinkFunction<String> {
        public static final List<String> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(String value, SinkFunction.Context context) throws Exception {
            values.add(value);
        }
    }
}

jwtryg avatar Dec 07 '23 22:12 jwtryg

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

boring-cyborg[bot] avatar Dec 07 '23 22:12 boring-cyborg[bot]

@jwtryg Can you please rebase your PR to trigger CI?

MartijnVisser avatar Jan 18 '24 15:01 MartijnVisser

@mas-chen Do you also want to take a look at this one?

MartijnVisser avatar Apr 04 '24 12:04 MartijnVisser