micrometer icon indicating copy to clipboard operation
micrometer copied to clipboard

too many threads micrometer-kafka

Open lmcdasi opened this issue 2 years ago • 6 comments

Describe the bug I have a spring boot kafka consumer/producer app & I have enabled the kafaka micrometer metrics to be integrated with prometheus. The kafka consumer executor was left to the default one. During execution, doing a kill -QUIT ${java_pid} and doing a grep on micrometer & kafka threads showed:

$ grep '"micrometer-kafka-metrics' log_file | wc -l
784
$ grep '"consumer-0-C-1' log_file | wc -l
784

Which seemed to match the same no. of kafka threads.

All of them are in time waiting:

"micrometer-kafka-metrics" #35 daemon prio=5 os_prio=0 cpu=27.24ms elapsed=490.80s tid=0x00007f51794d5800 nid=0x2e waiting on condition  [0x00007f516e82e000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <0x0000000716060e68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos([email protected]/LockSupport.java:234)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos([email protected]/AbstractQueuedSynchronizer.java:2123)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:1182)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:899)
        at java.util.concurrent.ThreadPoolExecutor.getTask([email protected]/ThreadPoolExecutor.java:1054)
        at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1114)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run([email protected]/Thread.java:829)

I have changed to code to use a ThreadPool executor for kafka container. And while the kafka threads are running low:

$ grep '"kafkaContainerExecutor' log_file | wc -l
7

The micrometer ones are still high

$ grep '"micrometer-' log_file | wc -l
69

I guess this is the code that triggers that. KafkaMetrics.java: private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("micrometer-kafka-metrics"))

Would it be possible to use instead a ThreadPool executor for all KafkaMetrics instances ? Or at least being able to customize that executor ?

Environment

  • Micrometer version [lio.micrometer:micrometer-core:jar:1.7.7:compile]
  • Micrometer registry [e.g. io.micrometer:micrometer-registry-prometheus:jar:1.7.7:compile]
  • OS: [linux alpine]
  • Java version: [java zulu 11]

lmcdasi avatar Mar 09 '22 11:03 lmcdasi

I have enabled the kafaka micrometer metrics

Could you elaborate more on how you are configuring things? Are you not using the Spring Boot auto-configuration for Spring Kafka metrics?

shakuzen avatar Mar 10 '22 05:03 shakuzen

In the app I do not seem to see any specific setup. The app does not seem have any reference to MicrometerProducerListener when searching the java code. Uses: org.springframework.boot:spring-boot:jar:2.5.8:compile So any config must be done by spring besides the scenes.

In the yaml file I do have:

management:
....
  metrics:
    export:
      prometheus:
        enabled: true
....
  endpoints:
    web:
      exposure:
        include: 'health,loggers,prometheus,threaddump'
...
  endpoint:
    metrics:
      enabled: true

spring:
  kafka:
    bootstrap-servers: ${KAFKA_SERVER:${LOCALHOST_KAFKA}}
#    properties:
#      sasl.jaas.config:
#      sasl.mechanism: PLAIN
#      security.protocol: SASL_SSL
    consumer:
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      group-id: event-subscription-manager
      properties:
        # The bellow values are the ones recommended by Microsoft - see jira XBIOMETRICS-15783 & XSUPPORT-2741
        connections.max.idle.ms: 180000
        metadata.max.age.ms: 180000
        session.timeout.ms: 30000
        max.poll.interval.ms: 300000

lmcdasi avatar Mar 10 '22 11:03 lmcdasi

KafkaMetrics creates one thread per client (consumer/producer), how many clients do you create/bind KafkaMetrics to?

Could you please provide us a minimal sample project to reproduce this issue so we can more easily investigate and ensure any fix is working properly for your use case?

jonatan-ivanov avatar Mar 23 '22 19:03 jonatan-ivanov

I changed.the code of the app to use a thread pool. That solved the above. The consumer threads got limited as well as the micrometer ones. Due to work load I do not have the time to create an app that eventually reproduce the issje, but I can summarize the steps that the app is doing, if that's good enough. Or may be I can attempt to strip out the business content out of that file and provided here. Thx.

lmcdasi avatar Mar 23 '22 23:03 lmcdasi

....... when running a high no. of 'onRequest' then I end up having too many kafka consumer threads with the micrometer ones. If I pass to thread pool executor then it get's under control. Without thread pool, kafkaListenerContainer.start(); , will start a single thread each time a request is performed, which I guess is transalated to one micrometer thread ....

public void onRequest(final ConsumeEventsRequest request, final StreamObserver<ConsumeEventsResponse> responseObserver) {
.......
            final Optional<EventStream> eventStreamOptional = eventStreamRepository.findById(eventStreamId);
            if (eventStreamOptional.isPresent()) {
                final EventStream eventStream = eventStreamOptional.get();

                final TopicPartitionOffset partitionOffset = new TopicPartitionOffset(
                        eventStream.getTopic(), eventStream.getPartition(), TopicPartitionOffset.SeekPosition.END);

                final AbstractMessageListenerContainer<String, byte[]> kafkaListenerContainer =
                        kafkaContainerFactory.createContainer(partitionOffset);
                final KafkaMessageListener kafkaMessageListener = new KafkaMessageListener(serverStreamObserver, request, eventStreamId);
                kafkaListenerContainer.setupMessageListener(kafkaMessageListener);

                serverStreamObserver.setOnCancelHandler(new OnCancelHandler(eventStreamId));

                kafkaListenerContainer.start();
                logger.info("Message listener container started on topic {} with partition {}",
                        eventStream.getTopic(), eventStream.getPartition());
                .....

lmcdasi avatar Mar 26 '22 16:03 lmcdasi

@lmcdasi Your example does not show your Micrometer usage which would be great to see in this case. My guess is that you create a lot of KafkaMetrics instances but the intended use-case is having one KafkaMetrics binder per client and one client per broker for the whole lifecycle of the application.

jonatan-ivanov avatar May 06 '22 23:05 jonatan-ivanov

I have 3 thousand consumers, and micrometer creates + 3k threads 'micrometer-kafka-metrics'. Сan we add the ability to set up our own thread pool for KafkaClientMetrics?

eof404 avatar May 11 '23 03:05 eof404

@eof404 Do you have 3k Kafka clients? Does this help? https://github.com/micrometer-metrics/micrometer/issues/3060#issuecomment-1120072271

jonatan-ivanov avatar May 12 '23 21:05 jonatan-ivanov

I'm use spring-kafka. Seems the problem is here: https://github.com/spring-projects/spring-kafka/blob/c2912f1852e8c9e79b7d72200e38f6a1b2fa1c3b/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java#L73

MicrometerConsumerListener creates new KafkaClientMetrics for each consumer

eof404 avatar May 15 '23 04:05 eof404

Assigning a KafkaClientMetrics instance to every Kafka Client is by design as of now but could you please answer the question above: Do you have 3k Kafka clients?

jonatan-ivanov avatar May 16 '23 18:05 jonatan-ivanov

Hi! Yes, i have too many instances of KafkaConsumer. And for every instance +1 thread 'micrometer-kafka-metrics'.

Maybe we could create a limited number of threads 'micrometer-kafka-metrics', regardless of the number of KafkaConsumer?

eof404 avatar May 17 '23 03:05 eof404

We do not anticipate thousands of Kafka client instances in an application. Adding a parameter to inject a thread pool might seem a simple fix though it introduces the risk in future changes for a use-case that we don't expect to happen. If we would do this and Kafka Metrics will change in the future so that it will not need a thread pool (fyi: it used to use JMX), we need to push everything that publicly surfaces that thread pool on a deprecation cycle which is less than ideal for a use-case that should not really happen.

As a workaround, I think you can override the bindTo and close methods of KafkaMetrics and make it possible to use the pool you want but as the fix, I would recommend using one (or just a couple) Kafka clients if possible.

With this, let me close this issue, please let us know if you think I misunderstood something and you want us to reopen it.

jonatan-ivanov avatar May 17 '23 20:05 jonatan-ivanov