pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][client] Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout.

Open shibd opened this issue 3 years ago • 12 comments

Motivation

Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout.

Modifications

  • Add consumer epoch on notifyPendingBatchReceivedCallBack.
  • Reuse notifyPendingBatchReceivedCallBack logic.

Verifying this change

  • testBatchReceiveRedeliveryAddEpoch unit test covers the scene.

Documentation

  • [x] doc-not-needed

Matching PR in forked repository

https://github.com/shibd/pulsar/pull/12

shibd avatar Aug 28 '22 16:08 shibd

/pulsarbot run-failure-checks

shibd avatar Aug 29 '22 15:08 shibd

@nodece Thanks, all comments fixed, PTAL.

shibd avatar Sep 06 '22 07:09 shibd

/pulsarbot run-failure-checks

shibd avatar Sep 14 '22 00:09 shibd

@congbobo184 Do you remember why the consumer epoch check happened after polling the messages from the receiver queue?

@shibd @congbobo184 Can we move it to the beginning of message receiving

https://github.com/apache/pulsar/blob/69f3f7471fa6faf24d4776d65e0509538c105d37/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1271-L1285

It looks like if the message with invalid epoch, we don't need to add it to the receiver queue. And we will also introduce client-side stats issues, because users haven't seen that message, but from stats they see the received messages are increased.

https://github.com/apache/pulsar/blob/69f3f7471fa6faf24d4776d65e0509538c105d37/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1628-L1644

The messages with invalid epoch should be the skipped messages like here

https://github.com/apache/pulsar/blob/69f3f7471fa6faf24d4776d65e0509538c105d37/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1559

It also makes sense to add skipped messages in ConsumerStats

codelipenghui avatar Sep 19 '22 03:09 codelipenghui

@codelipenghui because, if check the epoch before polling the message from the receiver queue, the user invoke redeliverUnacknowledgedMessages and the client checks the message epoch at the same time, the invalid epoch also can offer to the queue. user also can receive the smaller epoch message

congbobo184 avatar Sep 19 '22 11:09 congbobo184

@codelipenghui because, if check the epoch before polling the message from the receiver queue, the user invoke redeliverUnacknowledgedMessages and the client checks the message epoch at the same time, the invalid epoch also can offer to the queue. user also can receive the smaller epoch message

It looks like a race condition here. Is it possible to introduce a lock or use the same thread for messageReceived and redeliverUnacknowledgedMessages to resolve the issue?

My point is we should try to prevent the messages with invalid epoch in the first place, so that we don't need to handle this case for receive(), batchReceive(). And as I described in https://github.com/apache/pulsar/pull/17318#issuecomment-1250524811, it will also distort client stats.

codelipenghui avatar Sep 19 '22 11:09 codelipenghui

It looks like a race condition here. Is it possible to introduce a lock or use the same thread for messageReceived and redeliverUnacknowledgedMessages to resolve the issue?

@codelipenghui Seem cannot judge the consumer epoch before offer to the incoming queue. Because, when the incoming queue has messages and the user invoke redeliverUnacknowledgedMessages at this time, these invalid epoch message also will return to the user.

It looks like the consumer epoch should be judged before returning the message to the user.

@congbobo184 However, even so, in extreme cases, we cannot avoid the problem described below.

@codelipenghui because, if check the epoch before polling the message from the receiver queue, the user invoke redeliverUnacknowledgedMessages and the client checks the message epoch at the same time, the invalid epoch also can offer to the queue. user also can receive the smaller epoch message

Because the user may call the receive and redeliverUnacknowledgedMessages methods from multiple threads, in this case, user still may be receive old epoch messages. (TIP: this is not a good user example, but it may exist)

while(true) {
 msg = consumer.receive();
  // This message may be of an invalid epoch.
  if(needCummtiveAck) {
   try {
      consumer.cummtiveAck(msg);
    } catch() {
      new Thread(() - > {consumer.redeliverUnacknowledgedMessages}).start;
    }
  }
}

shibd avatar Sep 19 '22 15:09 shibd

@shibd yes, if the user use multi-thread call receive and redeliverUnacknowledgedMessages, we also can receive old epoch messages. in the most cast, use redeliverUnacknowledgedMessages for cumulative ack, so the user should receive and redeliverUnacknowledgedMessages in the same thread. So we only need to ensure that the message offered to queue and redeliverUnacknowledgedMessages are mutually exclusive. you can use the same thread or lock to implement it

congbobo184 avatar Sep 20 '22 08:09 congbobo184

@codelipenghui Seem cannot judge the consumer epoch before offer to the incoming queue. Because, when the incoming queue has messages and the user invoke redeliverUnacknowledgedMessages at this time, these invalid epoch message also will return to the user.

We should clean up the receiver queue when users invoke the redeliverUnacknowledgedMessages method?

Because the user may call the receive and redeliverUnacknowledgedMessages methods from multiple threads, in this case, user still may be receive old epoch messages. (TIP: this is not a good user example, but it may exist)

We can make them happen on a single thread. And we used the pinned thread to complete the receive message request

https://github.com/apache/pulsar/blob/311ca9049f5be72e8d1c5df7dd984afc975ba9d8/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L474-L489

codelipenghui avatar Sep 20 '22 08:09 codelipenghui

We should clean up the receiver queue when users invoke the redeliverUnacknowledgedMessages method?

Yes, In this way, the epoch can be judged before offer the queue. @congbobo184 What do you think?

We can make them happen on a single thread. And we used the pinned thread to complete the receive message request

We also have methods that are invoked directly, it doesn't use pinned thread.

https://github.com/apache/pulsar/blob/1df63495f3ba6244391cffefb4081e22371c9b4a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L441-L458

shibd avatar Sep 20 '22 09:09 shibd

@shibd

We should clean up the receiver queue when users invoke the redeliverUnacknowledgedMessages method?

We can make them happen on a single thread. And we used the pinned thread to complete the receive message request

Yes, In this way, the epoch can be judged before offer the queue. @congbobo184 What do you think? I think yes, we only guarantee that offer to queue and redeliverUnacknowledgedMessages are in one thread, we can use pinned thread.

We also have methods that are invoked directly, it doesn't use pinned thread.

in this case, we don't need to think about internalReceive ,

https://github.com/apache/pulsar/blob/1df63495f3ba6244391cffefb4081e22371c9b4a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L441-L458

congbobo184 avatar Sep 20 '22 09:09 congbobo184

@codelipenghui @congbobo184

I add a new incomingQueueLock to sync (judge epoch and offer incoming queue) and (change epoch and clear incoming queue) code block. PTAL.

shibd avatar Sep 21 '22 12:09 shibd

/pulsarbot run-failure-checks

shibd avatar Oct 10 '22 13:10 shibd