pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Open heesung-sohn opened this issue 3 years ago • 0 comments

Fixes #

Master Issue: #

Motivation

https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java#L190

    @Override
    public CompletableFuture<Message<T>> readNextAsync() {
        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
        receiveFuture.whenComplete((msg, t) -> {
           if (msg != null) {
               consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
                   log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
                           getConsumer().getSubscription(), msg.getMessageId(), ex);
                   return null;
               });
           }
        });
        return receiveFuture;
    }

I see ack failures due to null message id -- because we release the msg in TableViewImpl before reader.acknowledgeCumulativeAsync.

https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java#L176

2022-09-16T13:59:31,448 - WARN  - [pulsar-client-internal-39-1:ReaderImpl@184] - [persistent://public/default/test][reader-9c73a60e29] acknowledge message null cumulative fail.
org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Cannot handle message with null messageId
	at org.apache.pulsar.client.impl.ConsumerBase.validateMessageId(ConsumerBase.java:358) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:554) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
	at org.apache.pulsar.client.impl.ReaderImpl.lambda$readNextAsync$3(ReaderImpl.java:183) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
	at org.apache.pulsar.client.impl.ConsumerBase.lambda$completePendingReceive$0(ConsumerBase.java:288) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

The root cause is that we return receiveFuture here instead of receiveFuture.whenComplete.

Modifications

Return the CompetedFuture from .whenComplete(.. acknowledgeCumulativeAsync ).

Verifying this change

  • [x ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • *Added a unit test to confirm acknowledgeCumulativeAsync() that gets called after the messageId null check.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [ ] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc-required (Your PR needs to update docs and you will update later)

  • [x] doc-not-needed (Please explain why)

  • [ ] doc (Your PR contains doc changes)

  • [ ] doc-complete (Docs have been already added)

Matching PR in forked repository

PR in forked repository: https://github.com/heesung-sn/pulsar/pull/9

After opening this PR, the build in apache/pulsar will fail and instructions will be provided for opening a PR in the PR author's forked repository.

apache/pulsar pull requests should be first tested in your own fork since the apache/pulsar CI based on GitHub Actions has constrained resources and quota. GitHub Actions provides separate quota for pull requests that are executed in a forked repository.

The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer.

-->

heesung-sohn avatar Sep 20 '22 04:09 heesung-sohn