pulsar-flink icon indicating copy to clipboard operation
pulsar-flink copied to clipboard

[BUG] Cumulative Acknowledgement is not happening in the flink-connector-pulsar

Open anoop-khandelwal opened this issue 3 years ago • 0 comments

Describe the bug We are using the below libaries- Flink-1.15.0 Pulsar- 2.8.2 flink-connector-pulsar=1.15.0

To Reproduce Steps to reproduce the behavior:

  1. TestJob `public class TestJob { public static void main(String[] args) { String authParams = String.format("token:%s", PULSAR_CLIENT_AUTH_TOKEN); String topicPattern = "persistent://a/b/test"; List<String> topics = new ArrayList(); topics.add(topicPattern);

     Properties properties = new Properties();
     properties.setProperty(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME.key(), AuthenticationToken.class.getName());
     properties.setProperty(PulsarOptions.PULSAR_AUTH_PARAMS.key(), authParams);
     properties.setProperty(PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH.key(),PULSAR_CERT_PATH);
     properties.setProperty(PulsarOptions.PULSAR_SERVICE_URL.key(), PULSAR_HOST);
     properties.setProperty(PulsarOptions.PULSAR_CONNECT_TIMEOUT.key(),"600000");
     properties.setProperty(PulsarOptions.PULSAR_READ_TIMEOUT.key(),"600000");
     properties.setProperty(PulsarOptions.PULSAR_REQUEST_TIMEOUT.key(),"600000");
     properties.setProperty(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE.key(),Boolean.TRUE.toString());
    
     PulsarSource<String> src = PulsarSource.builder()
             .setServiceUrl(PULSAR_HOST)
             .setAdminUrl(PULSAR_ADMIN_HOST)
             .setProperties(properties)
             .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,10000000L)
             .setStartCursor(StartCursor.earliest())
             .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
             .setSubscriptionName("test-subscription-local")
             .setSubscriptionType(SubscriptionType.Failover)
             .setConsumerName(String.format("test-consumer-local"))
             .setTopics(topics).build();
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.getConfig().setAutoWatermarkInterval(0L);
     env.addDefaultKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
     String sourceName = String.format("pulsar-source-local");
     DataStream<String> stream =  env.fromSource(src, WatermarkStrategy.noWatermarks(),sourceName)
             .setParallelism(1)
             .uid(sourceName)
             .name(sourceName);
    
     stream
             .process(new TestProcessFunction()).setParallelism(1)
             .uid(String.format("test-job-pf"))
             .name(String.format("test-job-pf"))
             .addSink(new TestSink()).setParallelism(1)
             .uid(String.format("sink-job"))
             .name(String.format("sink-job"));
    

    } }`

Messages = M1.....M10 Expected behavior Upon the acknowledgment, messages should not be appearing again. Upon job restart after ensuring it has processed all the messages, the messages keep coming back. We saw that the cumulativeAcknowledgement() function is invoked all the time with or without checkpoint enabled.

anoop-khandelwal avatar Jul 28 '22 10:07 anoop-khandelwal