[improve][java-client] check consumer pause status before consumer receive/batchReceive
Motivation
*consumer client has method pause(), but consumer is not really paused. paused consumer only stop send flow permit request to broker but broker still can push messages to consumer if there is already existed flow permit. consumer can get messages if it invoked consumer.receive(). This may mislead users since no message is expected after pause. *
Modifications
park for some time if consumer is paused. consumer hold prefetched messages in client cache.
Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
Documentation
Check the box below or label this PR directly.
Need to update docs?
-
[ ]
doc-required(Your PR needs to update docs and you will update later) -
[ x]
doc-not-needed(already documented) -
[ ]
doc(Your PR contains doc changes) -
[ ]
doc-complete(Docs have been already added)
the doc for Consumer#pause is
/**
* Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause
* {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker.
*/
it doesn't say anything about stop receiving message instantly, I think current behavior is acceptable?
the doc for
Consumer#pauseis/** * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker. */it doesn't say anything about stop receiving message instantly, I think current behavior is acceptable?
User may expect stop receive instanly. Is it possible to change the api doc? For example in kafka, after pause is invoked, consumer just have empty poll and prefeched messages stay in cache. This mismatched behavior may surprise users migrated from kafka.
The pr had no activity for 30 days, mark with Stale label.