hyades icon indicating copy to clipboard operation
hyades copied to clipboard

Implement wrapper around `KafkaConsumer` to support batch consumption with head of line blocking

Open nscuro opened this issue 2 years ago • 2 comments

We have multiple use cases where Kafka records are assembled into batches before they're processed.

While batch semantics can be achieved with Kafka Streams, it introduces additional overhead:

  • Record offsets become eligible for committing once a record successfully passes through a sub-topology
  • It is thus not possible to delay offset commits until the batch is processed, as record offsets may be committed before that, risking message loss when processing the batch fails
  • Assembling record batches requires Kafka Streams state stores (in-memory or RocksDB), which in turn necessitate a changelog topic for fault tolerance
  • Batches can only be assembled per-partition, because each state store is bound to a single Kafka Streams task, which itself is bound to one partition

An example of trying to address such batching use cases in Kafka Streams can be seen here: https://github.com/DependencyTrack/hyades-apiserver/pull/305/files

For simple batching use cases, ideally it should work like this:

  • Subscribe to N partitions of topic foo
  • poll records from all N partitions
  • Put records into in-memory batch, until a given max size is reached
  • (Optionally send records that failed to deserialize to a dead-letter-topic)
  • When max batch size is reached, or a given timeout is reached, "flush" / process records (i.e. write to database, do HTTP call, ...)
  • When flushing was successful, commit offsets
  • When flushing was unsuccessful, do not commits offsets and either:
    • Fail the consumer entirely in case of non-retryable errors
    • Restart consumer from last committed offset in case of retryable errors

Essentially, implement a batch consumer with head-of-line blocking.

HOL blocking semantics are often undesirable, but for certain cases they are useful:

  • Assuming all records are flushed to the same "sink" (e.g. database), failure to flush one record (e.g. database is down) always implies others can't be flushed either. There is no point in proceeding to later records in the topic.
  • Retries are simple, because they simply involve restarting from the last committed offset. There is no additional state keeping necessary, and no retry- or changelog-topic is required.

Areas where I think this might be useful:

  • Ingestion of ScanResults and vulnerabilities from the dtrack.vuln-analysis.result topic
  • Ingestion of AnalysisResults from the dtrack.repo-meta-analysis.result topic
  • Buffering of ScanResults when tracking vulnerability scan completion (see https://github.com/DependencyTrack/hyades-apiserver/pull/305/files)
  • Ingestion of mirrored vulnerabilities from the dtrack.vulnerability topic
  • Performing vulnerability analysis with Snyk or OSS Index which support PURL batching

nscuro avatar Nov 13 '23 19:11 nscuro

A few example implementation of batching that I found:

  • https://github.com/apache/kafka/blob/3.6.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
  • https://github.com/getsentry/batching-kafka-consumer/blob/master/batching_kafka_consumer/init.py
  • https://github.com/benthosdev/benthos/blob/main/internal/impl/kafka/input_kafka_franz.go

nscuro avatar Nov 13 '23 19:11 nscuro

I just realized that Confluent's Parallel Consumer is doing exactly that: https://github.com/DependencyTrack/hyades/issues/346

Update: Confluent Parallel Consumer has no batch timeout behavior (doesn't wait for batches to become full). So ultimately we need to build our own batching consumer. But perhaps PC can be used as intermediary solution in the meantime.

nscuro avatar Nov 14 '23 11:11 nscuro

Closing, as consumers in the API server have been migrated to Confluent parallel-consumer.

nscuro avatar Jun 05 '24 10:06 nscuro