[fix][tableview] fixed ack failure in ReaderImpl due to null messageId
Fixes #
Master Issue: #
Motivation
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java#L190
@Override
public CompletableFuture<Message<T>> readNextAsync() {
CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
receiveFuture.whenComplete((msg, t) -> {
if (msg != null) {
consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
getConsumer().getSubscription(), msg.getMessageId(), ex);
return null;
});
}
});
return receiveFuture;
}
I see ack failures due to null message id -- because we release the msg in TableViewImpl before reader.acknowledgeCumulativeAsync.
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java#L176
2022-09-16T13:59:31,448 - WARN - [pulsar-client-internal-39-1:ReaderImpl@184] - [persistent://public/default/test][reader-9c73a60e29] acknowledge message null cumulative fail.
org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Cannot handle message with null messageId
at org.apache.pulsar.client.impl.ConsumerBase.validateMessageId(ConsumerBase.java:358) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:554) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
at org.apache.pulsar.client.impl.ReaderImpl.lambda$readNextAsync$3(ReaderImpl.java:183) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
at org.apache.pulsar.client.impl.ConsumerBase.lambda$completePendingReceive$0(ConsumerBase.java:288) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
The root cause is that we return receiveFuture here instead of receiveFuture.whenComplete.
Modifications
Return the CompetedFuture from .whenComplete(.. acknowledgeCumulativeAsync ).
Verifying this change
- [x ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- *Added a unit test to confirm
acknowledgeCumulativeAsync()that gets called after the messageId null check.
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] Anything that affects deployment
Documentation
-
[ ]
doc-required(Your PR needs to update docs and you will update later) -
[x]
doc-not-needed(Please explain why) -
[ ]
doc(Your PR contains doc changes) -
[ ]
doc-complete(Docs have been already added)
Matching PR in forked repository
PR in forked repository: https://github.com/heesung-sn/pulsar/pull/9
After opening this PR, the build in apache/pulsar will fail and instructions will be provided for opening a PR in the PR author's forked repository.
apache/pulsar pull requests should be first tested in your own fork since the apache/pulsar CI based on GitHub Actions has constrained resources and quota. GitHub Actions provides separate quota for pull requests that are executed in a forked repository.
The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer.
-->