pulsar
pulsar copied to clipboard
[feat][pip] PIP-426: Enable Consumer Throttling and Accurate Unacknowledged Message Tracking for Exclusive and Failover Subscriptions
Fixes https://github.com/apache/pulsar/issues/24159
Main Issue: https://github.com/apache/pulsar/issues/24159
Motivation
- I have tried to fix the issue by https://github.com/apache/pulsar/pull/24376. However, that PR doesn't support features about cumulative ack, batching mode, and transaction. So I want to improve it further.
- I have found another issue that flowcontrol of exclusive or failover consumer is not work. Since the issues are highly correlated, I want to fix them at the same time.
We can reproduce flow control issue by this unit test
@Test(timeOut = 30000)
public void testMaxUnackedMessagesOnExclusiveConsumer() throws Exception {
final String topicName = testTopic + System.currentTimeMillis();
final String subscriberName = "test-sub-exclusive" + System.currentTimeMillis();
final int unackMsgAllowed = 100;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 300;
ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
.ackTimeout(1, TimeUnit.MINUTES)
.subscriptionType(SubscriptionType.Exclusive);
@Cleanup
Consumer<String> consumer = consumerBuilder.subscribe();
// 1) Produced Messages
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message);
}
// 2) Unlimited, so all messages can be consumed
int count = 0;
List<Message<String>> list = new ArrayList<>(totalProducedMsgs);
for (int i = 0; i < totalProducedMsgs; i++) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
list.add(message);
}
assertEquals(count, totalProducedMsgs);
list.forEach(message -> {
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
}
});
// 3) Set restrictions, so only part of the data can be consumed
waitCacheInit(topicName);
admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed);
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed);
// 4) consumer can only consume 100 messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message);
}
int consumerCounter = 0;
Message<String> message = null;
for (int i = 0; i < totalProducedMsgs; i++) {
try {
Message<String> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
if (msg == null) {
break;
}
message = msg;
++consumerCounter;
} catch (PulsarClientException e) {
break;
}
}
assertEquals(consumerCounter, unackMsgAllowed);
consumer.acknowledgeCumulative(message.getMessageId());
consumerCounter = 0;
for (int i = 0; i < totalProducedMsgs - unackMsgAllowed; i++) {
try {
message = consumer.receive(500, TimeUnit.MILLISECONDS);
if (message == null) {
break;
}
++consumerCounter;
} catch (PulsarClientException e) {
break;
}
}
assertEquals(consumerCounter, unackMsgAllowed);
}
Modifications
- Reused and extended the
pendingAcksmechanism in theConsumerclass to support exclusive and failover subscriptions. - Removed the restriction that
pendingAcksonly works in individual ack mode. - Reused
PendingAckHandleImplfor transaction support after removing the dependency onSubscription.isIndividualAckMode.
Verifying this change
- [X] Make sure that the change passes the CI checks.
This change is already covered by existing tests, such as (please describe tests).
- MaxUnackedMessagesTest#testMaxUnackedMessagesOnConsumer
This change added tests and can be verified as follows:
- MaxUnackedMessagesTest#testMaxUnackedMessagesOnExclusiveConsumer
- MaxUnackedMessagesTest#testMaxUnackedMessagesOnFailOverConsumer
- PersistentDispatcherSingleActiveConsumerTest#testUnackedMessages
- PersistentDispatcherSingleActiveConsumerTest#testUnackedMessagesWithTransaction
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [X]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: