Implement wrapper around `KafkaConsumer` to support batch consumption with head of line blocking
We have multiple use cases where Kafka records are assembled into batches before they're processed.
While batch semantics can be achieved with Kafka Streams, it introduces additional overhead:
- Record offsets become eligible for committing once a record successfully passes through a sub-topology
- It is thus not possible to delay offset commits until the batch is processed, as record offsets may be committed before that, risking message loss when processing the batch fails
- Assembling record batches requires Kafka Streams state stores (in-memory or RocksDB), which in turn necessitate a changelog topic for fault tolerance
- Batches can only be assembled per-partition, because each state store is bound to a single Kafka Streams task, which itself is bound to one partition
An example of trying to address such batching use cases in Kafka Streams can be seen here: https://github.com/DependencyTrack/hyades-apiserver/pull/305/files
For simple batching use cases, ideally it should work like this:
- Subscribe to N partitions of topic
foo pollrecords from all N partitions- Put records into in-memory batch, until a given max size is reached
- (Optionally send records that failed to deserialize to a dead-letter-topic)
- When max batch size is reached, or a given timeout is reached, "flush" / process records (i.e. write to database, do HTTP call, ...)
- When flushing was successful, commit offsets
- When flushing was unsuccessful, do not commits offsets and either:
- Fail the consumer entirely in case of non-retryable errors
- Restart consumer from last committed offset in case of retryable errors
Essentially, implement a batch consumer with head-of-line blocking.
HOL blocking semantics are often undesirable, but for certain cases they are useful:
- Assuming all records are flushed to the same "sink" (e.g. database), failure to flush one record (e.g. database is down) always implies others can't be flushed either. There is no point in proceeding to later records in the topic.
- Retries are simple, because they simply involve restarting from the last committed offset. There is no additional state keeping necessary, and no retry- or changelog-topic is required.
Areas where I think this might be useful:
- Ingestion of
ScanResults and vulnerabilities from thedtrack.vuln-analysis.resulttopic - Ingestion of
AnalysisResults from thedtrack.repo-meta-analysis.resulttopic - Buffering of
ScanResults when tracking vulnerability scan completion (see https://github.com/DependencyTrack/hyades-apiserver/pull/305/files) - Ingestion of mirrored vulnerabilities from the
dtrack.vulnerabilitytopic - Performing vulnerability analysis with Snyk or OSS Index which support PURL batching
A few example implementation of batching that I found:
- https://github.com/apache/kafka/blob/3.6.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
- https://github.com/getsentry/batching-kafka-consumer/blob/master/batching_kafka_consumer/init.py
- https://github.com/benthosdev/benthos/blob/main/internal/impl/kafka/input_kafka_franz.go
I just realized that Confluent's Parallel Consumer is doing exactly that: https://github.com/DependencyTrack/hyades/issues/346
Update: Confluent Parallel Consumer has no batch timeout behavior (doesn't wait for batches to become full). So ultimately we need to build our own batching consumer. But perhaps PC can be used as intermediary solution in the meantime.
Closing, as consumers in the API server have been migrated to Confluent parallel-consumer.