kafka
kafka copied to clipboard
KAFKA-17439: Make polling for new records an explicit action/event in the new consumer
Updated the FetchRequestManager
to only create and enqueue fetch requests when signaled to do so by a FetchEvent
.
The application thread and the background thread each contains logic that is performed if there is buffered data from a previous fetch. There's a race condition because the presence of buffered data could change between the two threads’ respective checks. Right now the window for the race condition to occur is wide open; this change aims to make the window ajar.
In the ClassicKafkaConsumer
, the application thread will explicitly issue fetch requests (via the Fetcher
class) at specific points in the Consumer.poll()
cycle. Prior to this change, the AsyncKafkaConsumer
would issue fetch requests independently from the user calling Consumer.poll()
; the fetches would happen nearly continuously as soon as any assigned partition was fetchable. With this change, the AsyncKafkaConsumer
introduces a FetchEvent
that signals to the background thread that a fetch request should be issued. The specific points where this is done in the Consumer.poll()
cycle of the AsyncKafkaConsumer
now match the ClassicKafkaConsumer
. In short: this makes AsyncKafkaConsumer
act nearly identical to the ClassicKafkaConsumer
in this regard.
As mentioned above, this change does not completely solve the problem related to fetch session eviction. Exactly how the window where the race condition can be shut completely is outside the scope of this change.
See KAFKA-17182.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)