pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[draft] [fix] [client] fix ack failed when consumer is reconnecting

Open poorbarcode opened this issue 1 year ago • 0 comments

Motivation

The ack command will fail when the consumer is reconnecting when setting acknowledgmentGroupTime ->0. see https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L361-L364

private CompletableFuture<Void> doImmediateAck(MessageIdAdv msgId, AckType ackType, Map<String, Long> properties,
                                               BitSetRecyclable bitSet) {
    ClientCnx cnx = consumer.getClientCnx();

    if (cnx == null) {
        return FutureUtil.failedFuture(new PulsarClientException
                .ConnectException("Consumer connect fail! consumer state:" + consumer.getState()));
    }
    return newImmediateAckAndFlush(consumer.consumerId, msgId, bitSet, ackType, properties, cnx);
}

Modifications

Call ack after the consumer is connected.

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository: x

poorbarcode avatar Jan 18 '24 17:01 poorbarcode