camel icon indicating copy to clipboard operation
camel copied to clipboard

CAMEL-20227: camel-kafka - Kafka offset advances when using Pausable …

Open davsclaus opened this issue 1 year ago • 8 comments

…EIP and messages are lost when resumed

Description

Target

  • [ ] I checked that the commit is targeting the correct branch (note that Camel 3 uses camel-3.x, whereas Camel 4 uses the main branch)

Tracking

  • [ ] If this is a large change, bug fix, or code improvement, I checked there is a JIRA issue filed for the change (usually before you start working on it).

Apache Camel coding standards and style

  • [ ] I checked that each commit in the pull request has a meaningful subject line and body.
  • [ ] I have run mvn clean install -DskipTests locally and I have committed all auto-generated changes

davsclaus avatar Jul 05 '24 10:07 davsclaus

:star2: Thank you for your contribution to the Apache Camel project! :star2:

:robot: CI automation will test this PR automatically.

:camel: Apache Camel Committers, please review the following items:

  • First-time contributors require MANUAL approval for the GitHub Actions to run

  • You can use the command /component-test (camel-)component-name1 (camel-)component-name2.. to request a test from the test bot.

  • You can label PRs using build-all, build-dependents, skip-tests and test-dependents to fine-tune the checks executed by this PR.

  • Build and test logs are available in the Summary page. Only Apache Camel committers have access to the summary.

  • :warning: Be careful when sharing logs. Review their contents before sharing them publicly.

github-actions[bot] avatar Jul 05 '24 10:07 github-actions[bot]

Argh yeah this test is a bit wrong as you want to be able to consume all 15 messages, but in the middle the consumer is paused

davsclaus avatar Jul 05 '24 10:07 davsclaus

okay I improved this so the pause and resume will not lose messages and that the pause is waiting a poll cycle, otherwise it keeps looping asap which is not desireable (eat cpu and goes too fast)

davsclaus avatar Jul 05 '24 10:07 davsclaus

Cool. It's a holiday here, but I will try to take a look later today

orpiske avatar Jul 05 '24 10:07 orpiske

At first it wasn't very clear to me how the user would be losing messages after the pause.

The KafkaConsumer documentation says that pause "Suspend fetching from the requested partitions. Future calls to poll(Duration) will not return any records from these partitions until they have been resumed using resume(Collection)".

But looking more closely to the reproducer, I think I kinda understand the problem ... It's in part because of a misuse of the API (and, poor documentation for this particular feature).

In particular, our documentation states:

"The pausable EIP is meant to be used as a support mechanism when there is an exception somewhere in the route that prevents the exchange from being processed. More specifically, the check called by the pausable EIP should be used to test for transient conditions preventing the exchange from being processed."

Specifically, the trigger for those transient problems should be pulled by an exception, so that we have a failure in the processing result. Otherwise, we never cause the pause, thus leading to them losing the messages.

So, to make it short: the reproducer from the user is wrong.

orpiske avatar Jul 05 '24 14:07 orpiske

That said, I think we do have a room for improvement on the API. I am not sure if we should be allowing the code to resume the assignment if pause hasn't been called.

Maybe we could throw an exception explaining that this is a misuse of the API, and trying to force a resume without an underlying transient problem is not supported.

orpiske avatar Jul 05 '24 14:07 orpiske

Yeah okay that can be better explain in the docs / javadoc / and the API is using a predicate so an exception is not thrown.

If the intend is to pause/resume kafka via some logic then use RoutePolicy that is intended for that. There is a throttling policy out of the box, but you can make your own as well.

davsclaus avatar Jul 07 '24 06:07 davsclaus

@davsclaus actually, this is different from pausing the route per se. The idea for the pausable API is to pause the Kafka consumer (so as not to cause a rebalance when there is a transient issue).

Nonetheless, let's see if we can brainstorm some improvements to this once you are back. There are some specific requirements for pausing the KafkaConsumer (such as: it must be done from the same thread as the one calling 'poll'), but if we could do this using the RoutePolicy, we could retire this API and make it simpler for the users.

orpiske avatar Jul 07 '24 07:07 orpiske

closing old PR

davsclaus avatar Oct 30 '24 19:10 davsclaus