pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][broker] fix broker unackmessages number reduce error

Open congbobo184 opened this issue 3 years ago • 0 comments

issue: reduce the consumer unackedCount use incorrect consumer. reproduce step

        for (int i = 0; i < 5; i++) {
            producer.newMessage().value(("Hello Pulsar - " + i).getBytes()).sendAsync();
        }

        // consume-1 receive 5 batch messages
        List<MessageId> list = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            list.add(consumer1.receive().getMessageId());
        }

        // consumer-1 redeliver the batch messages
        consumer1.negativeAcknowledge(list.get(0));

        // consumer-2 will receive the messages that the consumer-1 redelivered
        for (int i = 0; i < 5; i++) {
            consumer2.receive().getMessageId();
        }

        // consumer1 ack two messages in the batch message
        consumer1.acknowledge(list.get(1));
        consumer1.acknowledge(list.get(2));

        // consumer-2 redeliver the rest of the messages
        consumer2.negativeAcknowledge(list.get(1));

        // consume-1 close will redeliver the rest messages to consumer-2
        consumer1.close();

        // consumer-2 can receive the rest of 3 messages
        for (int i = 0; i < 3; i++) {
            consumer2.acknowledge(consumer2.receive().getMessageId());
        }

        // consumer-2 can't receive any messages, all the messages in batch has been acked
        Message<byte[]> message = consumer2.receive(1, TimeUnit.SECONDS);
        assertNull(message);

        // the number of consumer-2's unacked messages is 0
        Awaitility.await().until(() -> getPulsar().getBrokerService().getTopic(topicName, false)
                .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages() == 0);

in current code the getUnackedMessages = 2

Motivation

get the correct consumer and reduce the correct un acked messages

Modifications

change the method getAckedCountForBatchIndexLevelEnabled use ownerConsumer to check the pendingAck messages

Verifying this change

add the test

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

congbobo184 avatar Aug 09 '22 02:08 congbobo184

@congbobo184 Would it be possible to describe the impact of the bug that was fixed by this PR? The current description describes implementation level details of the issue. However, most Pulsar users don't understand the implementation level, and therefore it would be useful if the externally observable behavior could be explained. What externally observable bug was fixed by this PR?

lhotari avatar Aug 15 '22 06:08 lhotari

Hi, @congbobo184 Would you like to help cherry-pick this PR to branch-2.9?

mattisonchao avatar Aug 25 '22 09:08 mattisonchao

@congbobo184 Would it be possible to describe the impact of the bug that was fixed by this PR? The current description describes implementation level details of the issue. However, most Pulsar users don't understand the implementation level, and therefore it would be useful if the externally observable behavior could be explained. What externally observable bug was fixed by this PR?

@lhotari This PR fixes some cases where consumer unackedCount may become a negative number.

congbobo184 avatar Sep 07 '22 02:09 congbobo184