PIP-218: Consumer batchReceive() single partition every receive
Motivation
Currently, client API does not use acknowledgeCumulative simple and easy to understand.
If the user uses the receive() interface to consume multi-partition topic, there are two ways to use acknowledgeCumulative correctly :
-
Cache the different topic name messageId locally.
Map<String, MessageId> map = new HashMap<>(); while (true) { for (int i = 0; i < 1000; i++) { Message<String> message = consumer.receive(); map.put(message.getTopicName(), message.getMessageId()); } map.forEach((k, v) -> consumer.acknowledgeCumulativeAsync(v)); map.clear(); }It is complex and difficult for users to understand.
-
Every message invokes
acknowledgeCumulative.while (true) { Message<String> message = consumer.receive(); consumer.acknowledgeCumulativeAsync(message); }Simple, but does not conform to the definition of 'acknowledgeCumulative', which is difficult for users to understand.
Similarly, using batchReceive() is the same as receive(). Therefore, we need to implement a simple and easy-to-understand usage way.
Goal
Because receive() cannot reflect the value of 'cumulative ack', so we mainly consider batchReceive(). We can make every batchReceive() receive the same topic messages.
// consumer the multi-partition messages
while (true) {
Messages<String> messages = consumer.batchReceive();
Message<String> finalMessage = null;
for (Message<String> message : messages) {
process(message);
finalMessage = message;
}
if (finalMessage != null) {
consumer.acknowledgeCumulativeAsync(finalMessage);
}
}
API Changes
Add configuration in BatchReceivePolicy.
/**
* If it is false, one time `batachReceive()` only can receive the the single topic messages,
* The max messages and max size will not be strictly followed. (default: true).
*/
private final boolean messagesFromMultiTopicsEnabled;
Implementation
Change ConsumerBase batchReceive logical. If peek different topic message, return the batch result.
protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messages<T>> batchReceiveFuture) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
String topicName = null;
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
// one batch receive request only can receive the same topic partition
if (receiveSingleTopicMessageEnabled) {
// get the first message's `topicName` to check if the following message peeked is the same topic message.
if (messages.size() == 1) {
topicName = messages.getMessageList().get(0).getTopicName();
}
// if the peeked message is not the same topic as the first message, return the batch result
if (topicName != null && !topicName.equals(msgPeeked.getTopicName())) {
break;
}
}
Message<T> msg = incomingMessages.poll();
if (msg != null) {
messageProcessed(msg);
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
msgPeeked = incomingMessages.peek();
}
completePendingBatchReceive(batchReceiveFuture, messages);
}
Alternatives
Don't add any config, this will cause users to fall back to the old version of the client and cannot use this feature without awareness.
Anything else?
Follow-up Work
optimize performance
Mail discuss
https://lists.apache.org/thread/bd61h8vvnm9sf1n43wfh1t4vzsg26l2h
Are you willing to submit a PR?
- [x] I'm willing to submit a PR!
Just one suggestion about the new option name
Maybe messagesFromMultiTopicsEnabled=true is more understandable.
And add a description of the behavior if the user disables it.
It looks like if messagesFromMultiTopicsEnabled=false. The max messages and max size will not be strictly followed.
Just one suggestion about the new option name
Maybe
messagesFromMultiTopicsEnabled=trueis more understandable. And add a description of the behavior if the user disables it. It looks like ifmessagesFromMultiTopicsEnabled=false. The max messages and max size will not be strictly followed.
make sense
The issue had no activity for 30 days, mark with Stale label.