[fix][client] Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout.
Motivation
Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout.
Modifications
- Add consumer epoch on
notifyPendingBatchReceivedCallBack. - Reuse
notifyPendingBatchReceivedCallBacklogic.
Verifying this change
testBatchReceiveRedeliveryAddEpochunit test covers the scene.
Documentation
- [x]
doc-not-needed
Matching PR in forked repository
https://github.com/shibd/pulsar/pull/12
/pulsarbot run-failure-checks
@nodece Thanks, all comments fixed, PTAL.
/pulsarbot run-failure-checks
@congbobo184 Do you remember why the consumer epoch check happened after polling the messages from the receiver queue?
@shibd @congbobo184 Can we move it to the beginning of message receiving
https://github.com/apache/pulsar/blob/69f3f7471fa6faf24d4776d65e0509538c105d37/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1271-L1285
It looks like if the message with invalid epoch, we don't need to add it to the receiver queue. And we will also introduce client-side stats issues, because users haven't seen that message, but from stats they see the received messages are increased.
https://github.com/apache/pulsar/blob/69f3f7471fa6faf24d4776d65e0509538c105d37/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1628-L1644
The messages with invalid epoch should be the skipped messages like here
https://github.com/apache/pulsar/blob/69f3f7471fa6faf24d4776d65e0509538c105d37/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1559
It also makes sense to add skipped messages in ConsumerStats
@codelipenghui because, if check the epoch before polling the message from the receiver queue, the user invoke redeliverUnacknowledgedMessages and the client checks the message epoch at the same time, the invalid epoch also can offer to the queue. user also can receive the smaller epoch message
@codelipenghui because, if check the epoch before polling the message from the receiver queue, the user invoke redeliverUnacknowledgedMessages and the client checks the message epoch at the same time, the invalid epoch also can offer to the queue. user also can receive the smaller epoch message
It looks like a race condition here. Is it possible to introduce a lock or use the same thread for messageReceived and redeliverUnacknowledgedMessages to resolve the issue?
My point is we should try to prevent the messages with invalid epoch in the first place, so that we don't need to handle this case for receive(), batchReceive(). And as I described in https://github.com/apache/pulsar/pull/17318#issuecomment-1250524811, it will also distort client stats.
It looks like a race condition here. Is it possible to introduce a lock or use the same thread for messageReceived and redeliverUnacknowledgedMessages to resolve the issue?
@codelipenghui Seem cannot judge the consumer epoch before offer to the incoming queue. Because, when the incoming queue has messages and the user invoke redeliverUnacknowledgedMessages at this time, these invalid epoch message also will return to the user.
It looks like the consumer epoch should be judged before returning the message to the user.
@congbobo184 However, even so, in extreme cases, we cannot avoid the problem described below.
@codelipenghui because, if check the epoch before polling the message from the receiver queue, the user invoke redeliverUnacknowledgedMessages and the client checks the message epoch at the same time, the invalid epoch also can offer to the queue. user also can receive the smaller epoch message
Because the user may call the receive and redeliverUnacknowledgedMessages methods from multiple threads, in this case, user still may be receive old epoch messages. (TIP: this is not a good user example, but it may exist)
while(true) {
msg = consumer.receive();
// This message may be of an invalid epoch.
if(needCummtiveAck) {
try {
consumer.cummtiveAck(msg);
} catch() {
new Thread(() - > {consumer.redeliverUnacknowledgedMessages}).start;
}
}
}
@shibd yes, if the user use multi-thread call receive and redeliverUnacknowledgedMessages, we also can receive old epoch messages. in the most cast, use redeliverUnacknowledgedMessages for cumulative ack, so the user should receive and redeliverUnacknowledgedMessages in the same thread. So we only need to ensure that the message offered to queue and redeliverUnacknowledgedMessages are mutually exclusive. you can use the same thread or lock to implement it
@codelipenghui Seem cannot judge the consumer epoch before offer to the incoming queue. Because, when the incoming queue has messages and the user invoke redeliverUnacknowledgedMessages at this time, these invalid epoch message also will return to the user.
We should clean up the receiver queue when users invoke the redeliverUnacknowledgedMessages method?
Because the user may call the receive and redeliverUnacknowledgedMessages methods from multiple threads, in this case, user still may be receive old epoch messages. (TIP: this is not a good user example, but it may exist)
We can make them happen on a single thread. And we used the pinned thread to complete the receive message request
https://github.com/apache/pulsar/blob/311ca9049f5be72e8d1c5df7dd984afc975ba9d8/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L474-L489
We should clean up the receiver queue when users invoke the redeliverUnacknowledgedMessages method?
Yes, In this way, the epoch can be judged before offer the queue. @congbobo184 What do you think?
We can make them happen on a single thread. And we used the pinned thread to complete the receive message request
We also have methods that are invoked directly, it doesn't use pinned thread.
https://github.com/apache/pulsar/blob/1df63495f3ba6244391cffefb4081e22371c9b4a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L441-L458
@shibd
We should clean up the receiver queue when users invoke the redeliverUnacknowledgedMessages method?
We can make them happen on a single thread. And we used the pinned thread to complete the receive message request
Yes, In this way, the epoch can be judged before offer the queue. @congbobo184 What do you think? I think yes, we only guarantee that offer to queue and redeliverUnacknowledgedMessages are in one thread, we can use pinned thread.
We also have methods that are invoked directly, it doesn't use pinned thread.
in this case, we don't need to think about internalReceive ,
https://github.com/apache/pulsar/blob/1df63495f3ba6244391cffefb4081e22371c9b4a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L441-L458
@codelipenghui @congbobo184
I add a new incomingQueueLock to sync (judge epoch and offer incoming queue) and (change epoch and clear incoming queue) code block. PTAL.
/pulsarbot run-failure-checks