kafka-elasticsearch-injector icon indicating copy to clipboard operation
kafka-elasticsearch-injector copied to clipboard

Add deadline on consumer to send non-empty record batch

Open ggml1 opened this issue 3 years ago • 0 comments

Description

I have added a deadline for the consumer to flush non-empty batches to the storage service, which defaults to one minute without receiving new records.

This deadline is also configurable through the environment variable KAFKA_CONSUMER_BATCH_DEADLINE.

For the flush logic to be reusable in both select cases, I have split the send logic in a separate function.

Note: I had to run go mod tidy and import some packages again, because some dependencies were broken and I was unable to test or build in my local environment.

Why is this PR needed?

Closes #8.

Checklist

  • [ ] Branch is named according to standards (feature/*, fix/*, hotfix/*)
  • [x] Code compiles correctly
  • [ ] Created tests which fail without the change (if possible)
  • [x] All tests passing
  • [x] Extended the README / documentation, if necessary

ggml1 avatar Apr 23 '21 18:04 ggml1