ratis
ratis copied to clipboard
RATIS-1705. Fix metrics leak
What changes were proposed in this pull request?
(https://github.com/apache/ratis/blob/ab9fc535dc6f48ba1d93687fc46022989beb6b0a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java#L61-L72)
When MetricRegistriesImpl#create is executed in multiple threads, enableJmxReporter() will be executed multiple times because reporterRegistrations.isEmpty() is not locked
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/RATIS-1705
Please replace this section with the link to the Apache JIRA)
How was this patch tested?
The bug described here will not reproduce: https://issues.apache.org/jira/browse/RATIS-1705
@xichen01 , thanks a lot for reporting the bug and fixing it! Since the lists won't change frequently, how about add synchronized to all the methods and use ArrayList instead of CopyOnWriteArrayList?
diff --git a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
index cbe8d7bf..a459509d 100644
--- a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
+++ b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
@@ -17,12 +17,12 @@
*/
package org.apache.ratis.metrics.impl;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.apache.ratis.metrics.MetricRegistries;
@@ -41,10 +41,36 @@ public class MetricRegistriesImpl extends MetricRegistries {
private static final Logger LOG = LoggerFactory.getLogger(MetricRegistriesImpl.class);
- private final List<Consumer<RatisMetricRegistry>> reporterRegistrations = new CopyOnWriteArrayList<>();
+ class Reporters {
+ private final List<Consumer<RatisMetricRegistry>> registrations = new ArrayList<>();
+ private final List<Consumer<RatisMetricRegistry>> stoppers = new ArrayList<>();
- private final List<Consumer<RatisMetricRegistry>> stopReporters = new CopyOnWriteArrayList<>();
+ synchronized RatisMetricRegistry create(MetricRegistryInfo info) {
+ System.out.println("isEmpty? " + registrations.isEmpty());
+ if (registrations.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("First MetricRegistry has been created without registering reporters. "
+ + "Hence registering JMX reporter by default.");
+ }
+ enableJmxReporter();
+ }
+
+ final RatisMetricRegistry registry = factory.create(info);
+ registrations.forEach(reg -> reg.accept(registry));
+ return registry;
+ }
+ synchronized void add(Consumer<RatisMetricRegistry> registration, Consumer<RatisMetricRegistry> stopper) {
+ registrations.add(registration);
+ stoppers.add(stopper);
+ }
+
+ synchronized void remove(RatisMetricRegistry registry) {
+ stoppers.forEach(reg -> reg.accept(registry));
+ }
+ }
+
+ private final Reporters reporters = new Reporters();
private final MetricRegistryFactory factory;
private final RefCountingMap<MetricRegistryInfo, RatisMetricRegistry> registries;
@@ -60,27 +86,12 @@ public class MetricRegistriesImpl extends MetricRegistries {
@Override
public RatisMetricRegistry create(MetricRegistryInfo info) {
- return registries.put(info, () -> {
- if (reporterRegistrations.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("First MetricRegistry has been created without registering reporters. " +
- "Hence registering JMX reporter by default.");
- }
- enableJmxReporter();
- }
- RatisMetricRegistry registry = factory.create(info);
- reporterRegistrations.forEach(reg -> reg.accept(registry));
- return registry;
- });
+ return registries.put(info, () -> reporters.create(info));
}
@Override
public boolean remove(MetricRegistryInfo key) {
- RatisMetricRegistry registry = registries.get(key);
- if (registry != null) {
- stopReporters.forEach(reg -> reg.accept(registry));
- }
-
+ get(key).ifPresent(reporters::remove);
return registries.remove(key) == null;
}
@@ -111,12 +122,12 @@ public class MetricRegistriesImpl extends MetricRegistries {
LOG.warn("New reporters are added after registries were created. Some metrics will be missing from the reporter. "
+ "Please add reporter before adding any new registry.");
}
- this.reporterRegistrations.add(reporterRegistration);
- this.stopReporters.add(stopReporter);
+ reporters.add(reporterRegistration, stopReporter);
}
@Override
public void enableJmxReporter() {
+ System.out.println("enableJmxReporter");
addReporterRegistration(
MetricsReporting.jmxReporter(),
MetricsReporting.stopJmxReporter());
BTW, it probably is another race condition in the original code that it is not synchronized when adding to the lists. https://github.com/apache/ratis/blob/ab9fc535dc6f48ba1d93687fc46022989beb6b0a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java#L114-L115
I use this https://github.com/apache/ratis/pull/744/commits/f0a09fabf1eb488e5849a7dc105e6cc7047ca678 commit code to test Ozone S3, Tested with a multiupload of 100 threads, GRPC message metric leaks still appear. This is the test code
import boto3
from boto3.s3.transfer import TransferConfig
MB = 1024 * 1024
s3 = boto3.resource('s3')
MultipartChunkSize = 5 * MB
MaxConcurrency = 100
AccessKeyID = "chen"
SecretAccessKey = "user_key"
BucketRegion = "dummyregion"
BucketName = "testbucket"
EndpointUrl = "http://localhost:9878"
LocalFilePath = "/Users/pony.chen/500M.img"
ObjectKeyName = "key_from_py1"
def multipart_upload(client, local_file_path, bucket_name, object_key, multipart_chunksize,
max_concurrency):
config = TransferConfig(multipart_chunksize=multipart_chunksize,
max_concurrency=max_concurrency)
client.upload_file(local_file_path, bucket_name, object_key, Config=config)
if __name__ == '__main__':
client = boto3.client('s3',
endpoint_url=EndpointUrl,
aws_access_key_id=AccessKeyID,
aws_secret_access_key=SecretAccessKey)
multipart_upload(client, LocalFilePath, BucketName, ObjectKeyName, MultipartChunkSize,
MaxConcurrency)
I found that the leaked metrics are all related XXX_received_executed metrics
such as:
[chen@hostname /root/ratis]% curl -s http://localhost:9878/jmx | grep client_message_metrics
//....
"name" : "ratis:name=ratis.client_message_metrics.client-78FD5D54C056->0806d407-c2cb-4628-a986-9d9f4da7393f.client-78FD5D54C056->0806d407-c2cb-4628-a986-9d9f4da7393f_ratis.grpc.RaftClientProtocolService_ordered_OK_received_executed",
"name" : "ratis:name=ratis.client_message_metrics.client-F3678E2734CB->3be0191f-50c8-4c86-9d31-b9d5169c42db.client-F3678E2734CB->3be0191f-50c8-4c86-9d31-b9d5169c42db_ratis.grpc.RaftClientProtocolService_unordered_OK_received_executed",
"name" : "ratis:name=ratis.client_message_metrics.client-268EA6C4F3CA->0806d407-c2cb-4628-a986-9d9f4da7393f.client-268EA6C4F3CA->0806d407-c2cb-4628-a986-9d9f4da7393f_ratis.grpc.RaftClientProtocolService_ordered_OK_received_executed",
"name" : "ratis:name=ratis.client_message_metrics.client-36DFCCCF5CE1->3be0191f-50c8-4c86-9d31-b9d5169c42db.client-36DFCCCF5CE1->3be0191f-50c8-4c86-9d31-b9d5169c42db_ratis.grpc.RaftClientProtocolService_ordered_OK_received_executed",
[chen@hostname /root/ratis]%
So I added some synchronized at this commit https://github.com/apache/ratis/pull/744/commits/22d646a1b7cbed94def3c901e5a4e66874ea2be1
after applying this commit https://github.com/apache/ratis/pull/744/commits/22d646a1b7cbed94def3c901e5a4e66874ea2be1, all metrics leaks are gone
There are two synchronized added here, in my test, if there is no one synchronized, the metrics leak will appear
But I don't really understand why this can fix the GRPC message metrics leak, @codings-dan @szetszwo any suggestions?
@xichen01 , thanks a lot for reporting the bug and fixing it! Since the lists won't change frequently, how about add
synchronizedto all the methods and useArrayListinstead ofCopyOnWriteArrayList?diff --git a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java index cbe8d7bf..a459509d 100644 --- a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java +++ b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java @@ -17,12 +17,12 @@ */ package org.apache.ratis.metrics.impl; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; import org.apache.ratis.metrics.MetricRegistries; @@ -41,10 +41,36 @@ public class MetricRegistriesImpl extends MetricRegistries { private static final Logger LOG = LoggerFactory.getLogger(MetricRegistriesImpl.class); - private final List<Consumer<RatisMetricRegistry>> reporterRegistrations = new CopyOnWriteArrayList<>(); + class Reporters { + private final List<Consumer<RatisMetricRegistry>> registrations = new ArrayList<>(); + private final List<Consumer<RatisMetricRegistry>> stoppers = new ArrayList<>(); - private final List<Consumer<RatisMetricRegistry>> stopReporters = new CopyOnWriteArrayList<>(); + synchronized RatisMetricRegistry create(MetricRegistryInfo info) { + System.out.println("isEmpty? " + registrations.isEmpty()); + if (registrations.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("First MetricRegistry has been created without registering reporters. " + + "Hence registering JMX reporter by default."); + } + enableJmxReporter(); + } + + final RatisMetricRegistry registry = factory.create(info); + registrations.forEach(reg -> reg.accept(registry)); + return registry; + } + synchronized void add(Consumer<RatisMetricRegistry> registration, Consumer<RatisMetricRegistry> stopper) { + registrations.add(registration); + stoppers.add(stopper); + } + + synchronized void remove(RatisMetricRegistry registry) { + stoppers.forEach(reg -> reg.accept(registry)); + } + } + + private final Reporters reporters = new Reporters(); private final MetricRegistryFactory factory; private final RefCountingMap<MetricRegistryInfo, RatisMetricRegistry> registries; @@ -60,27 +86,12 @@ public class MetricRegistriesImpl extends MetricRegistries { @Override public RatisMetricRegistry create(MetricRegistryInfo info) { - return registries.put(info, () -> { - if (reporterRegistrations.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("First MetricRegistry has been created without registering reporters. " + - "Hence registering JMX reporter by default."); - } - enableJmxReporter(); - } - RatisMetricRegistry registry = factory.create(info); - reporterRegistrations.forEach(reg -> reg.accept(registry)); - return registry; - }); + return registries.put(info, () -> reporters.create(info)); } @Override public boolean remove(MetricRegistryInfo key) { - RatisMetricRegistry registry = registries.get(key); - if (registry != null) { - stopReporters.forEach(reg -> reg.accept(registry)); - } - + get(key).ifPresent(reporters::remove); return registries.remove(key) == null; } @@ -111,12 +122,12 @@ public class MetricRegistriesImpl extends MetricRegistries { LOG.warn("New reporters are added after registries were created. Some metrics will be missing from the reporter. " + "Please add reporter before adding any new registry."); } - this.reporterRegistrations.add(reporterRegistration); - this.stopReporters.add(stopReporter); + reporters.add(reporterRegistration, stopReporter); } @Override public void enableJmxReporter() { + System.out.println("enableJmxReporter"); addReporterRegistration( MetricsReporting.jmxReporter(), MetricsReporting.stopJmxReporter());
ArrayList With synchronized may be better maintained than CopyOnWriteArrayList, I well try this
Hi @xichen01, could you rebase this patch against the master branch once you got time? It seems there is some conflicts now.