pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[improve][java-client] check consumer pause status before consumer receive/batchReceive

Open yapxue opened this issue 3 years ago • 3 comments

Motivation

*consumer client has method pause(), but consumer is not really paused. paused consumer only stop send flow permit request to broker but broker still can push messages to consumer if there is already existed flow permit. consumer can get messages if it invoked consumer.receive(). This may mislead users since no message is expected after pause. *

Modifications

park for some time if consumer is paused. consumer hold prefetched messages in client cache.

Verifying this change

  • [ ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • [ ] doc-required (Your PR needs to update docs and you will update later)

  • [ x] doc-not-needed (already documented)

  • [ ] doc (Your PR contains doc changes)

  • [ ] doc-complete (Docs have been already added)

yapxue avatar Aug 19 '22 08:08 yapxue

the doc for Consumer#pause is

/**
 * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause
 * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker.
 */

it doesn't say anything about stop receiving message instantly, I think current behavior is acceptable?

MarvinCai avatar Aug 21 '22 05:08 MarvinCai

the doc for Consumer#pause is

/**
 * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause
 * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker.
 */

it doesn't say anything about stop receiving message instantly, I think current behavior is acceptable?

User may expect stop receive instanly. Is it possible to change the api doc? For example in kafka, after pause is invoked, consumer just have empty poll and prefeched messages stay in cache. This mismatched behavior may surprise users migrated from kafka.

yapxue avatar Aug 22 '22 01:08 yapxue

The pr had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Sep 23 '22 02:09 github-actions[bot]