goka icon indicating copy to clipboard operation
goka copied to clipboard

Pause and Resume Functionality in Processor

Open johnmehan opened this issue 2 years ago • 4 comments

Would it be possible to add functionality to pause and resume the Processor?

The sarama.Consumer already has these methods: Pause, PauseAll, Resume, ResumeAll.

Thanks

johnmehan avatar Nov 30 '22 22:11 johnmehan

@johnmehan Yes it is possible to add the pause and resume functionality. Can you please share the use case for which you would need this functionality?

roubanakhle avatar Dec 13 '22 12:12 roubanakhle

@roubanakhle So, in order to gain a performance improvement, we have our callback queuing messages into a queue. We then have a separate process consuming from this queue and committing messages when processed. We have achieved a substantial speed improvement but we do need to pause/resume our consumer in order to manage memory usage.

johnmehan avatar Dec 13 '22 16:12 johnmehan

@johnmehan sorry for the long delay, I gave it a quick shot in this PR: https://github.com/lovoo/goka/pull/411. It's not tested and all, but would that be something you had in mind?

frairon avatar Dec 25 '22 15:12 frairon

This seems to be exactly what we need. Our usecase is that we have a service A that is dependent on another service B for its lookups. Service A consumes diagnostics but for each consumed message it needs to check if the source of the diagnostic message is still in active service (we want to ignore diagnostics from inactive sources).

Whether the source is active or inactive is determined by service B. To service A, these two scenarios will look identical:

  1. Source is inactive (i.e. not found in "active sources registry")
  2. Service B has stopped (causing TTLs to expire in "active sources registry")

To alleviate this, we can provide a heartbeat from service B, and if the heartbeats stop, pause consuming diagnostics in service A until heartbeats resume.

mariusw avatar Mar 28 '23 13:03 mariusw