micrometer
micrometer copied to clipboard
too many threads micrometer-kafka
Describe the bug I have a spring boot kafka consumer/producer app & I have enabled the kafaka micrometer metrics to be integrated with prometheus. The kafka consumer executor was left to the default one. During execution, doing a kill -QUIT ${java_pid} and doing a grep on micrometer & kafka threads showed:
$ grep '"micrometer-kafka-metrics' log_file | wc -l
784
$ grep '"consumer-0-C-1' log_file | wc -l
784
Which seemed to match the same no. of kafka threads.
All of them are in time waiting:
"micrometer-kafka-metrics" #35 daemon prio=5 os_prio=0 cpu=27.24ms elapsed=490.80s tid=0x00007f51794d5800 nid=0x2e waiting on condition [0x00007f516e82e000]
java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x0000000716060e68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos([email protected]/LockSupport.java:234)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos([email protected]/AbstractQueuedSynchronizer.java:2123)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:1182)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:899)
at java.util.concurrent.ThreadPoolExecutor.getTask([email protected]/ThreadPoolExecutor.java:1054)
at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1114)
at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
at java.lang.Thread.run([email protected]/Thread.java:829)
I have changed to code to use a ThreadPool executor for kafka container. And while the kafka threads are running low:
$ grep '"kafkaContainerExecutor' log_file | wc -l
7
The micrometer ones are still high
$ grep '"micrometer-' log_file | wc -l
69
I guess this is the code that triggers that. KafkaMetrics.java: private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("micrometer-kafka-metrics"))
Would it be possible to use instead a ThreadPool executor for all KafkaMetrics instances ? Or at least being able to customize that executor ?
Environment
- Micrometer version [lio.micrometer:micrometer-core:jar:1.7.7:compile]
- Micrometer registry [e.g. io.micrometer:micrometer-registry-prometheus:jar:1.7.7:compile]
- OS: [linux alpine]
- Java version: [java zulu 11]
I have enabled the kafaka micrometer metrics
Could you elaborate more on how you are configuring things? Are you not using the Spring Boot auto-configuration for Spring Kafka metrics?
In the app I do not seem to see any specific setup. The app does not seem have any reference to MicrometerProducerListener when searching the java code. Uses: org.springframework.boot:spring-boot:jar:2.5.8:compile So any config must be done by spring besides the scenes.
In the yaml file I do have:
management:
....
metrics:
export:
prometheus:
enabled: true
....
endpoints:
web:
exposure:
include: 'health,loggers,prometheus,threaddump'
...
endpoint:
metrics:
enabled: true
spring:
kafka:
bootstrap-servers: ${KAFKA_SERVER:${LOCALHOST_KAFKA}}
# properties:
# sasl.jaas.config:
# sasl.mechanism: PLAIN
# security.protocol: SASL_SSL
consumer:
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
group-id: event-subscription-manager
properties:
# The bellow values are the ones recommended by Microsoft - see jira XBIOMETRICS-15783 & XSUPPORT-2741
connections.max.idle.ms: 180000
metadata.max.age.ms: 180000
session.timeout.ms: 30000
max.poll.interval.ms: 300000
KafkaMetrics
creates one thread per client (consumer/producer), how many clients do you create/bind KafkaMetrics
to?
Could you please provide us a minimal sample project to reproduce this issue so we can more easily investigate and ensure any fix is working properly for your use case?
I changed.the code of the app to use a thread pool. That solved the above. The consumer threads got limited as well as the micrometer ones. Due to work load I do not have the time to create an app that eventually reproduce the issje, but I can summarize the steps that the app is doing, if that's good enough. Or may be I can attempt to strip out the business content out of that file and provided here. Thx.
....... when running a high no. of 'onRequest' then I end up having too many kafka consumer threads with the micrometer ones. If I pass to thread pool executor then it get's under control. Without thread pool, kafkaListenerContainer.start(); , will start a single thread each time a request is performed, which I guess is transalated to one micrometer thread ....
public void onRequest(final ConsumeEventsRequest request, final StreamObserver<ConsumeEventsResponse> responseObserver) {
.......
final Optional<EventStream> eventStreamOptional = eventStreamRepository.findById(eventStreamId);
if (eventStreamOptional.isPresent()) {
final EventStream eventStream = eventStreamOptional.get();
final TopicPartitionOffset partitionOffset = new TopicPartitionOffset(
eventStream.getTopic(), eventStream.getPartition(), TopicPartitionOffset.SeekPosition.END);
final AbstractMessageListenerContainer<String, byte[]> kafkaListenerContainer =
kafkaContainerFactory.createContainer(partitionOffset);
final KafkaMessageListener kafkaMessageListener = new KafkaMessageListener(serverStreamObserver, request, eventStreamId);
kafkaListenerContainer.setupMessageListener(kafkaMessageListener);
serverStreamObserver.setOnCancelHandler(new OnCancelHandler(eventStreamId));
kafkaListenerContainer.start();
logger.info("Message listener container started on topic {} with partition {}",
eventStream.getTopic(), eventStream.getPartition());
.....
@lmcdasi Your example does not show your Micrometer usage which would be great to see in this case. My guess is that you create a lot of KafkaMetrics instances but the intended use-case is having one KafkaMetrics
binder per client and one client per broker for the whole lifecycle of the application.
I have 3 thousand consumers, and micrometer creates + 3k threads 'micrometer-kafka-metrics'. Сan we add the ability to set up our own thread pool for KafkaClientMetrics?
@eof404 Do you have 3k Kafka clients? Does this help? https://github.com/micrometer-metrics/micrometer/issues/3060#issuecomment-1120072271
I'm use spring-kafka. Seems the problem is here: https://github.com/spring-projects/spring-kafka/blob/c2912f1852e8c9e79b7d72200e38f6a1b2fa1c3b/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java#L73
MicrometerConsumerListener creates new KafkaClientMetrics for each consumer
Assigning a KafkaClientMetrics
instance to every Kafka Client is by design as of now but could you please answer the question above: Do you have 3k Kafka clients?
Hi! Yes, i have too many instances of KafkaConsumer. And for every instance +1 thread 'micrometer-kafka-metrics'.
Maybe we could create a limited number of threads 'micrometer-kafka-metrics', regardless of the number of KafkaConsumer?
We do not anticipate thousands of Kafka client instances in an application. Adding a parameter to inject a thread pool might seem a simple fix though it introduces the risk in future changes for a use-case that we don't expect to happen. If we would do this and Kafka Metrics will change in the future so that it will not need a thread pool (fyi: it used to use JMX), we need to push everything that publicly surfaces that thread pool on a deprecation cycle which is less than ideal for a use-case that should not really happen.
As a workaround, I think you can override the bindTo
and close
methods of KafkaMetrics
and make it possible to use the pool you want but as the fix, I would recommend using one (or just a couple) Kafka clients if possible.
With this, let me close this issue, please let us know if you think I misunderstood something and you want us to reopen it.