kafka-elasticsearch-injector
kafka-elasticsearch-injector copied to clipboard
Add deadline on consumer to send non-empty record batch
Description
I have added a deadline for the consumer to flush non-empty batches to the storage service, which defaults to one minute without receiving new records.
This deadline is also configurable through the environment variable KAFKA_CONSUMER_BATCH_DEADLINE
.
For the flush logic to be reusable in both select cases, I have split the send logic in a separate function.
Note: I had to run go mod tidy
and import some packages again, because some dependencies were broken and I was unable to test or build in my local environment.
Why is this PR needed?
Closes #8.
Checklist
- [ ] Branch is named according to standards (
feature/*
,fix/*
,hotfix/*
) - [x] Code compiles correctly
- [ ] Created tests which fail without the change (if possible)
- [x] All tests passing
- [x] Extended the README / documentation, if necessary