pulsar
pulsar copied to clipboard
[draft] [fix] [client] fix ack failed when consumer is reconnecting
Motivation
The ack command will fail when the consumer is reconnecting when setting acknowledgmentGroupTime ->0. see https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L361-L364
private CompletableFuture<Void> doImmediateAck(MessageIdAdv msgId, AckType ackType, Map<String, Long> properties,
BitSetRecyclable bitSet) {
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null) {
return FutureUtil.failedFuture(new PulsarClientException
.ConnectException("Consumer connect fail! consumer state:" + consumer.getState()));
}
return newImmediateAckAndFlush(consumer.consumerId, msgId, bitSet, ackType, properties, cnx);
}
Modifications
Call ack after the consumer is connected.
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: x