pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

PIP-218: Consumer batchReceive() single partition every receive

Open congbobo184 opened this issue 3 years ago • 2 comments

Motivation

Currently, client API does not use acknowledgeCumulative simple and easy to understand.

If the user uses the receive() interface to consume multi-partition topic, there are two ways to use acknowledgeCumulative correctly :

  1. Cache the different topic name messageId locally.

            Map<String, MessageId> map = new HashMap<>();
    
            while (true) {
                for (int i = 0; i < 1000; i++) {
                    Message<String> message = consumer.receive();
                    map.put(message.getTopicName(), message.getMessageId());
                }
                map.forEach((k, v) -> consumer.acknowledgeCumulativeAsync(v));
                map.clear();
            }
    

    It is complex and difficult for users to understand.

  2. Every message invokes acknowledgeCumulative.

            while (true) {
                Message<String> message = consumer.receive();
                consumer.acknowledgeCumulativeAsync(message);
            }
    

    Simple, but does not conform to the definition of 'acknowledgeCumulative', which is difficult for users to understand.

Similarly, using batchReceive() is the same as receive(). Therefore, we need to implement a simple and easy-to-understand usage way.

Goal

Because receive() cannot reflect the value of 'cumulative ack', so we mainly consider batchReceive(). We can make every batchReceive() receive the same topic messages.

	// consumer the multi-partition messages
        while (true) {
            Messages<String> messages = consumer.batchReceive();
            Message<String> finalMessage = null;
            for (Message<String> message : messages) {
                process(message);
                finalMessage = message;
            }
            if (finalMessage != null) {
                consumer.acknowledgeCumulativeAsync(finalMessage);
            }
        }

API Changes

Add configuration in BatchReceivePolicy.

    /**
     * If it is false, one time `batachReceive()` only can receive the the single topic messages,
     * The max messages and max size will not be strictly followed. (default: true).
     */
    private final boolean messagesFromMultiTopicsEnabled;

Implementation

Change ConsumerBase batchReceive logical. If peek different topic message, return the batch result.

protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messages<T>> batchReceiveFuture) {
    MessagesImpl<T> messages = getNewMessagesImpl();
    Message<T> msgPeeked = incomingMessages.peek();
    String topicName = null;
    while (msgPeeked != null && messages.canAdd(msgPeeked)) {
        // one batch receive request only can receive the same topic partition
        if (receiveSingleTopicMessageEnabled) {
            // get the first message's `topicName` to check if the following message peeked is the same topic message.
            if (messages.size() == 1) {
                topicName = messages.getMessageList().get(0).getTopicName();
            }
            // if the peeked message is not the same topic as the first message, return the batch result
            if (topicName != null && !topicName.equals(msgPeeked.getTopicName())) {
                break;
            }
        }
        Message<T> msg = incomingMessages.poll();
        if (msg != null) {
            messageProcessed(msg);
            Message<T> interceptMsg = beforeConsume(msg);
            messages.add(interceptMsg);
        }
        msgPeeked = incomingMessages.peek();
    }
    completePendingBatchReceive(batchReceiveFuture, messages);
}

Alternatives

Don't add any config, this will cause users to fall back to the old version of the client and cannot use this feature without awareness.

Anything else?

Follow-up Work

optimize performance

Mail discuss

https://lists.apache.org/thread/bd61h8vvnm9sf1n43wfh1t4vzsg26l2h

Are you willing to submit a PR?

  • [x] I'm willing to submit a PR!

congbobo184 avatar Oct 25 '22 04:10 congbobo184

Just one suggestion about the new option name

Maybe messagesFromMultiTopicsEnabled=true is more understandable. And add a description of the behavior if the user disables it. It looks like if messagesFromMultiTopicsEnabled=false. The max messages and max size will not be strictly followed.

codelipenghui avatar Oct 25 '22 14:10 codelipenghui

Just one suggestion about the new option name

Maybe messagesFromMultiTopicsEnabled=true is more understandable. And add a description of the behavior if the user disables it. It looks like if messagesFromMultiTopicsEnabled=false. The max messages and max size will not be strictly followed.

make sense

congbobo184 avatar Oct 27 '22 10:10 congbobo184

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Dec 02 '22 02:12 github-actions[bot]