akka-persistence-kafka
akka-persistence-kafka copied to clipboard
Lots of CLOSE_WAIT connections
trafficstars
Hello, I am using latest version akka-persistence-kafka for persisting state of one kind of Actor (but very hot Actor), and I noticed lots CLOSE_WAIT sockets, and as application starts, it's number grows:
netstat -ant | grep CLOSE_WAIT
tcp6 1 0 172.31.39.249:33181 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:60979 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:33271 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:59335 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:32958 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:33150 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:60964 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:33153 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:32939 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:33285 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:60968 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:33287 172.31.39.249:9092 CLOSE_WAIT
tcp6 1 0 172.31.39.249:32967 172.31.39.249:9092 CLOSE_WAIT
There is nothing except akka-persistence who connects to kafka, looks like a bug.
Just noticed that some of them are, tough, being closed eventually, but not all. Is it ok for kafka client to keep connections in half-closed state for tens of minutes?
Sometimes I see in logs the following exception, it may be related:
Error processing "kafka.producer":type="ProducerTopicMetrics",name="-AllTopicsMessagesPerSec"
javax.management.InstanceNotFoundException: "kafka.producer":type="ProducerTopicMetrics",name="-AllTopicsMessagesPerSec"
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095) ~[na:1.8.0_45]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427) ~[na:1.8.0_45]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415) ~[na:1.8.0_45]
at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546) ~[na:1.8.0_45]
at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462) ~[metrics-core-2.2.0.jar:na]
at com.yammer.metrics.reporting.JmxReporter.processMeter(JmxReporter.java:412) ~[metrics-core-2.2.0.jar:na]
at com.yammer.metrics.reporting.JmxReporter.processMeter(JmxReporter.java:16) ~[metrics-core-2.2.0.jar:na]
at com.yammer.metrics.core.Meter.processWith(Meter.java:131) ~[metrics-core-2.2.0.jar:na]
at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395) ~[metrics-core-2.2.0.jar:na]
at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516) [metrics-core-2.2.0.jar:na]
at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491) [metrics-core-2.2.0.jar:na]
at com.yammer.metrics.core.MetricsRegistry.newMeter(MetricsRegistry.java:240) [metrics-core-2.2.0.jar:na]
at kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51) [kafka_2.11-0.8.2-beta.jar:na]
at kafka.producer.ProducerTopicMetrics.newMeter(ProducerTopicStats.scala:26) [kafka_2.11-0.8.2-beta.jar:na]
at kafka.producer.ProducerTopicMetrics.<init>(ProducerTopicStats.scala:27) [kafka_2.11-0.8.2-beta.jar:na]
at kafka.producer.ProducerTopicStats.<init>(ProducerTopicStats.scala:39) [kafka_2.11-0.8.2-beta.jar:na]
at kafka.producer.ProducerTopicStatsRegistry$$anonfun$2.apply(ProducerTopicStats.scala:52) [kafka_2.11-0.8.2-beta.jar:na]
at kafka.producer.ProducerTopicStatsRegistry$$anonfun$2.apply(ProducerTopicStats.scala:52) [kafka_2.11-0.8.2-beta.jar:na]
at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) [kafka_2.11-0.8.2-beta.jar:na]
at kafka.producer.ProducerTopicStatsRegistry$.getProducerTopicStats(ProducerTopicStats.scala:56) [kafka_2.11-0.8.2-beta.jar:na]
at kafka.producer.async.DefaultEventHandler.<init>(DefaultEventHandler.scala:49) [kafka_2.11-0.8.2-beta.jar:na]
at kafka.producer.Producer.<init>(Producer.scala:59) [kafka_2.11-0.8.2-beta.jar:na]
at akka.persistence.kafka.snapshot.KafkaSnapshotStore$$anonfun$saveAsync$1.apply$mcV$sp(KafkaSnapshotStore.scala:104) [akka-persistence-kafka_2.11-0.3.4.jar:0.3.4]
at akka.persistence.kafka.snapshot.KafkaSnapshotStore$$anonfun$saveAsync$1.apply(KafkaSnapshotStore.scala:101) [akka-persistence-kafka_2.11-0.3.4.jar:0.3.4]
at akka.persistence.kafka.snapshot.KafkaSnapshotStore$$anonfun$saveAsync$1.apply(KafkaSnapshotStore.scala:101) [akka-persistence-kafka_2.11-0.3.4.jar:0.3.4]
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) [scala-library-2.11.6.jar:na]
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) [scala-library-2.11.6.jar:na]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [akka-actor_2.11-2.3.11.jar:na]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) [akka-actor_2.11-2.3.11.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.6.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.6.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.6.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.6.jar:na]