hyades icon indicating copy to clipboard operation
hyades copied to clipboard

Slow processors can slow down overall throughput when `num.stream.threads` < Number of Stream Tasks

Open nscuro opened this issue 2 years ago • 4 comments

As per Kafka Streams' threading model, a topology is broken down into stream tasks. The number of tasks created depends on how many sub-topologies there are, and from how many partitions those sub-topologies consume. One or more tasks are assigned to a single stream thread. The number of stream threads in an application instance is defined by kafka-streams.num.stream.threads.

If num.stream.threads is lower than the number of stream tasks generated for the topology, there will be some threads working on multiple tasks. In Kafka Streams, there is no way to influence how tasks are assigned. This could lead to situations where one thread is assigned to tasks from both:

  • A fast "upstream" task, responsible for ingesting input records (e.g. by consuming from dtrack.vuln-analysis.component)
  • A slow "downstream" task, responsible for processing records (e.g. the Snyk analyzer, capable of analyzing 3-4 records per second)

This means that the upstream task (capable of processing multiple hundreds of records per second) will be significantly delayed, which also has a negative impact on other tasks depending on its results.

In the screenshot below, consumers of the dtrack.vuln-analysis.component and dtrack.vuln-analysis.component.purl topics have a significant lag, despite the respective sub-topologies being capable of processing hundreds of records per second. This should not be the case. Instead, consumers of those topics should be able to keep up with new records in almost realtime.

image

For the vulnerability analyzer, a slow processor like Snyk can additionally lead to sub-optimal batching in other processors. Because fewer records arrive in a given time window, OSS Index will not be able to use the maximum batch size of 128 efficiently:

2023-01-09 10:18:54,317 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 46 records
2023-01-09 10:18:54,797 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 48 records
2023-01-09 10:19:04,425 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-3) Analyzing batch of 23 records
2023-01-09 10:19:04,934 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 24 records
2023-01-09 10:19:05,954 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 23 records
2023-01-09 10:19:11,077 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 12 records
2023-01-09 10:19:11,079 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 11 records
2023-01-09 10:19:11,896 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-3) Analyzing batch of 19 records
2023-01-09 10:19:16,812 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 12 records
2023-01-09 10:19:17,068 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 13 records
2023-01-09 10:19:21,825 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 12 records
2023-01-09 10:19:21,826 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-3) Analyzing batch of 26 records
2023-01-09 10:19:22,072 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 8 records
2023-01-09 10:19:26,887 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-3) Analyzing batch of 8 records
2023-01-09 10:19:27,235 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 8 records
2023-01-09 10:19:31,943 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 19 records
2023-01-09 10:19:31,996 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-3) Analyzing batch of 12 records
2023-01-09 10:19:36,983 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 9 records

This is not an issue when there is one stream thread per stream task. For example, when both OSS Index and Snyk are enabled, but the internal analyzer is disabled, there are 18 partitions the application consumes from, leading to 18 stream tasks. Spawning one application instance with num.stream.threads=18, or three instances with num.stream.threads=6, leads to optimal processing conditions.

Effectively, scaling up is unproblematic, but scaling down always comes with the danger of significantly slowing down the entire system.

Possible solutions:

  • Move each analyzer into their own microservice (more operational overhead)
  • Start multiple KafkaStreams instances instead of just one (Quarkus only supports one KafkaStreams per application, we'd need more code to make this work, and we have to give up the dev mode integration)

nscuro avatar Jan 09 '23 11:01 nscuro

The problem is also documented here: https://medium.com/@andy.bryant/kafka-streams-work-allocation-4f31c24753cc

nscuro avatar Jan 09 '23 13:01 nscuro

Using Smallrye reactive messaging would linder this pain, but introduce others. With SRM, consumer threads can be scaled independently. Major downsides are that we'd loose the convenience of Kafka-backed state stores (stateful processing / "checkpointing" requires Redis or database access), also the batching functionality provided by Smallrye is sub-optimal as it returns too few records most of the time.

nscuro avatar Jan 09 '23 18:01 nscuro

Wondering how much this actually matters considering that a "completed" analysis would require results from all scanners anyway. Makes it questionable whether OSS Index being significantly faster than Snyk makes a noticeable difference in the end-to-end use case. If multiple analyzers are enabled, the slowest of them will always be the bottleneck, no matter how fast all others are.

The effect will also be less noticeable once caching kicks in.

nscuro avatar Jan 10 '23 11:01 nscuro

SRM behaves in very unintuitive ways:

  • Consuming from multiple partitions of a topic in parallel is supported, unless @Blocking is used
    • @Blocking is required for synchronous things, e.g. database interactions and REST calls
    • Per default @Blocking causes messages to be processed on one vert-x worker thread
      • When consuming from 3 partitions, I'd have expected 3 worker pools to be used, but that is not happening
      • Ordered processing is super slow, scaling up would require spawning more application instances (no way to increase concurrency in a single instance)
    • Processing can be extended to multiple vert-x worker threads by using @Blocking(ordered = false)
    • With unordered processing, messages from all partitions are thrown into a single worker pool
      • Negates the benefits of choosing PURL as record key, as multiple threads may now process the same PURL again
    • Requires usage of throttled commit strategy, which can cause processed record offsets to not be committed at all (when the application is shut down), or commit offsets even for records that have not been successfully processed
  • The reactive programming and threading model is really painful to use and understand

nscuro avatar Jan 10 '23 14:01 nscuro