[BUG] Cumulative Acknowledgement is not happening in the flink-connector-pulsar
Describe the bug We are using the below libaries- Flink-1.15.0 Pulsar- 2.8.2 flink-connector-pulsar=1.15.0
To Reproduce Steps to reproduce the behavior:
-
TestJob `public class TestJob { public static void main(String[] args) { String authParams = String.format("token:%s", PULSAR_CLIENT_AUTH_TOKEN); String topicPattern = "persistent://a/b/test"; List<String> topics = new ArrayList(); topics.add(topicPattern);
Properties properties = new Properties(); properties.setProperty(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME.key(), AuthenticationToken.class.getName()); properties.setProperty(PulsarOptions.PULSAR_AUTH_PARAMS.key(), authParams); properties.setProperty(PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH.key(),PULSAR_CERT_PATH); properties.setProperty(PulsarOptions.PULSAR_SERVICE_URL.key(), PULSAR_HOST); properties.setProperty(PulsarOptions.PULSAR_CONNECT_TIMEOUT.key(),"600000"); properties.setProperty(PulsarOptions.PULSAR_READ_TIMEOUT.key(),"600000"); properties.setProperty(PulsarOptions.PULSAR_REQUEST_TIMEOUT.key(),"600000"); properties.setProperty(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE.key(),Boolean.TRUE.toString()); PulsarSource<String> src = PulsarSource.builder() .setServiceUrl(PULSAR_HOST) .setAdminUrl(PULSAR_ADMIN_HOST) .setProperties(properties) .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,10000000L) .setStartCursor(StartCursor.earliest()) .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema())) .setSubscriptionName("test-subscription-local") .setSubscriptionType(SubscriptionType.Failover) .setConsumerName(String.format("test-consumer-local")) .setTopics(topics).build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(0L); env.addDefaultKryoSerializer(DateTime.class, JodaDateTimeSerializer.class); String sourceName = String.format("pulsar-source-local"); DataStream<String> stream = env.fromSource(src, WatermarkStrategy.noWatermarks(),sourceName) .setParallelism(1) .uid(sourceName) .name(sourceName); stream .process(new TestProcessFunction()).setParallelism(1) .uid(String.format("test-job-pf")) .name(String.format("test-job-pf")) .addSink(new TestSink()).setParallelism(1) .uid(String.format("sink-job")) .name(String.format("sink-job"));} }`
Messages = M1.....M10 Expected behavior Upon the acknowledgment, messages should not be appearing again. Upon job restart after ensuring it has processed all the messages, the messages keep coming back. We saw that the cumulativeAcknowledgement() function is invoked all the time with or without checkpoint enabled.