ratis icon indicating copy to clipboard operation
ratis copied to clipboard

RATIS-1705. Fix metrics leak

Open xichen01 opened this issue 3 years ago • 4 comments

What changes were proposed in this pull request?

(https://github.com/apache/ratis/blob/ab9fc535dc6f48ba1d93687fc46022989beb6b0a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java#L61-L72)

When MetricRegistriesImpl#create is executed in multiple threads, enableJmxReporter() will be executed multiple times because reporterRegistrations.isEmpty() is not locked

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-1705

Please replace this section with the link to the Apache JIRA)

How was this patch tested?

The bug described here will not reproduce: https://issues.apache.org/jira/browse/RATIS-1705

xichen01 avatar Sep 12 '22 15:09 xichen01

@xichen01 , thanks a lot for reporting the bug and fixing it! Since the lists won't change frequently, how about add synchronized to all the methods and use ArrayList instead of CopyOnWriteArrayList?

diff --git a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
index cbe8d7bf..a459509d 100644
--- a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
+++ b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
@@ -17,12 +17,12 @@
  */
 package org.apache.ratis.metrics.impl;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
 
 import org.apache.ratis.metrics.MetricRegistries;
@@ -41,10 +41,36 @@ public class MetricRegistriesImpl extends MetricRegistries {
 
   private static final Logger LOG = LoggerFactory.getLogger(MetricRegistriesImpl.class);
 
-  private final List<Consumer<RatisMetricRegistry>> reporterRegistrations = new CopyOnWriteArrayList<>();
+  class Reporters {
+    private final List<Consumer<RatisMetricRegistry>> registrations = new ArrayList<>();
+    private final List<Consumer<RatisMetricRegistry>> stoppers = new ArrayList<>();
 
-  private final List<Consumer<RatisMetricRegistry>> stopReporters = new CopyOnWriteArrayList<>();
+    synchronized RatisMetricRegistry create(MetricRegistryInfo info) {
+      System.out.println("isEmpty? " + registrations.isEmpty());
+      if (registrations.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("First MetricRegistry has been created without registering reporters. "
+              + "Hence registering JMX reporter by default.");
+        }
+        enableJmxReporter();
+      }
+
+      final RatisMetricRegistry registry = factory.create(info);
+      registrations.forEach(reg -> reg.accept(registry));
+      return registry;
+    }
 
+    synchronized void add(Consumer<RatisMetricRegistry> registration, Consumer<RatisMetricRegistry> stopper) {
+      registrations.add(registration);
+      stoppers.add(stopper);
+    }
+
+    synchronized void remove(RatisMetricRegistry registry) {
+      stoppers.forEach(reg -> reg.accept(registry));
+    }
+  }
+
+  private final Reporters reporters = new Reporters();
   private final MetricRegistryFactory factory;
 
   private final RefCountingMap<MetricRegistryInfo, RatisMetricRegistry> registries;
@@ -60,27 +86,12 @@ public class MetricRegistriesImpl extends MetricRegistries {
 
   @Override
   public RatisMetricRegistry create(MetricRegistryInfo info) {
-    return registries.put(info, () -> {
-      if (reporterRegistrations.isEmpty()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("First MetricRegistry has been created without registering reporters. " +
-              "Hence registering JMX reporter by default.");
-        }
-        enableJmxReporter();
-      }
-      RatisMetricRegistry registry = factory.create(info);
-      reporterRegistrations.forEach(reg -> reg.accept(registry));
-      return registry;
-    });
+    return registries.put(info, () -> reporters.create(info));
   }
 
   @Override
   public boolean remove(MetricRegistryInfo key) {
-    RatisMetricRegistry registry = registries.get(key);
-    if (registry != null) {
-      stopReporters.forEach(reg -> reg.accept(registry));
-    }
-
+    get(key).ifPresent(reporters::remove);
     return registries.remove(key) == null;
   }
 
@@ -111,12 +122,12 @@ public class MetricRegistriesImpl extends MetricRegistries {
       LOG.warn("New reporters are added after registries were created. Some metrics will be missing from the reporter. "
           + "Please add reporter before adding any new registry.");
     }
-    this.reporterRegistrations.add(reporterRegistration);
-    this.stopReporters.add(stopReporter);
+    reporters.add(reporterRegistration, stopReporter);
   }
 
   @Override
   public void enableJmxReporter() {
+    System.out.println("enableJmxReporter");
     addReporterRegistration(
         MetricsReporting.jmxReporter(),
         MetricsReporting.stopJmxReporter());

szetszwo avatar Sep 13 '22 16:09 szetszwo

BTW, it probably is another race condition in the original code that it is not synchronized when adding to the lists. https://github.com/apache/ratis/blob/ab9fc535dc6f48ba1d93687fc46022989beb6b0a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java#L114-L115

szetszwo avatar Sep 13 '22 16:09 szetszwo

I use this https://github.com/apache/ratis/pull/744/commits/f0a09fabf1eb488e5849a7dc105e6cc7047ca678 commit code to test Ozone S3, Tested with a multiupload of 100 threads, GRPC message metric leaks still appear. This is the test code

import boto3
from boto3.s3.transfer import TransferConfig

MB = 1024 * 1024
s3 = boto3.resource('s3')

MultipartChunkSize = 5 * MB
MaxConcurrency = 100
AccessKeyID = "chen"
SecretAccessKey = "user_key"
BucketRegion = "dummyregion"
BucketName = "testbucket"
EndpointUrl = "http://localhost:9878"
LocalFilePath = "/Users/pony.chen/500M.img"
ObjectKeyName = "key_from_py1"


def multipart_upload(client, local_file_path, bucket_name, object_key, multipart_chunksize,
                     max_concurrency):
    config = TransferConfig(multipart_chunksize=multipart_chunksize,
                            max_concurrency=max_concurrency)
    client.upload_file(local_file_path, bucket_name, object_key, Config=config)


if __name__ == '__main__':
    client = boto3.client('s3',
                          endpoint_url=EndpointUrl,
                          aws_access_key_id=AccessKeyID,
                          aws_secret_access_key=SecretAccessKey)
    multipart_upload(client, LocalFilePath, BucketName, ObjectKeyName, MultipartChunkSize,
                         MaxConcurrency)

I found that the leaked metrics are all related XXX_received_executed metrics such as:

[chen@hostname /root/ratis]% curl -s http://localhost:9878/jmx | grep client_message_metrics
//....
    "name" : "ratis:name=ratis.client_message_metrics.client-78FD5D54C056->0806d407-c2cb-4628-a986-9d9f4da7393f.client-78FD5D54C056->0806d407-c2cb-4628-a986-9d9f4da7393f_ratis.grpc.RaftClientProtocolService_ordered_OK_received_executed",
    "name" : "ratis:name=ratis.client_message_metrics.client-F3678E2734CB->3be0191f-50c8-4c86-9d31-b9d5169c42db.client-F3678E2734CB->3be0191f-50c8-4c86-9d31-b9d5169c42db_ratis.grpc.RaftClientProtocolService_unordered_OK_received_executed",
    "name" : "ratis:name=ratis.client_message_metrics.client-268EA6C4F3CA->0806d407-c2cb-4628-a986-9d9f4da7393f.client-268EA6C4F3CA->0806d407-c2cb-4628-a986-9d9f4da7393f_ratis.grpc.RaftClientProtocolService_ordered_OK_received_executed",
    "name" : "ratis:name=ratis.client_message_metrics.client-36DFCCCF5CE1->3be0191f-50c8-4c86-9d31-b9d5169c42db.client-36DFCCCF5CE1->3be0191f-50c8-4c86-9d31-b9d5169c42db_ratis.grpc.RaftClientProtocolService_ordered_OK_received_executed",
[chen@hostname /root/ratis]% 

So I added some synchronized at this commit https://github.com/apache/ratis/pull/744/commits/22d646a1b7cbed94def3c901e5a4e66874ea2be1

after applying this commit https://github.com/apache/ratis/pull/744/commits/22d646a1b7cbed94def3c901e5a4e66874ea2be1, all metrics leaks are gone There are two synchronized added here, in my test, if there is no one synchronized, the metrics leak will appear

But I don't really understand why this can fix the GRPC message metrics leak, @codings-dan @szetszwo any suggestions?

xichen01 avatar Sep 15 '22 11:09 xichen01

@xichen01 , thanks a lot for reporting the bug and fixing it! Since the lists won't change frequently, how about add synchronized to all the methods and use ArrayList instead of CopyOnWriteArrayList?

diff --git a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
index cbe8d7bf..a459509d 100644
--- a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
+++ b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
@@ -17,12 +17,12 @@
  */
 package org.apache.ratis.metrics.impl;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
 
 import org.apache.ratis.metrics.MetricRegistries;
@@ -41,10 +41,36 @@ public class MetricRegistriesImpl extends MetricRegistries {
 
   private static final Logger LOG = LoggerFactory.getLogger(MetricRegistriesImpl.class);
 
-  private final List<Consumer<RatisMetricRegistry>> reporterRegistrations = new CopyOnWriteArrayList<>();
+  class Reporters {
+    private final List<Consumer<RatisMetricRegistry>> registrations = new ArrayList<>();
+    private final List<Consumer<RatisMetricRegistry>> stoppers = new ArrayList<>();
 
-  private final List<Consumer<RatisMetricRegistry>> stopReporters = new CopyOnWriteArrayList<>();
+    synchronized RatisMetricRegistry create(MetricRegistryInfo info) {
+      System.out.println("isEmpty? " + registrations.isEmpty());
+      if (registrations.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("First MetricRegistry has been created without registering reporters. "
+              + "Hence registering JMX reporter by default.");
+        }
+        enableJmxReporter();
+      }
+
+      final RatisMetricRegistry registry = factory.create(info);
+      registrations.forEach(reg -> reg.accept(registry));
+      return registry;
+    }
 
+    synchronized void add(Consumer<RatisMetricRegistry> registration, Consumer<RatisMetricRegistry> stopper) {
+      registrations.add(registration);
+      stoppers.add(stopper);
+    }
+
+    synchronized void remove(RatisMetricRegistry registry) {
+      stoppers.forEach(reg -> reg.accept(registry));
+    }
+  }
+
+  private final Reporters reporters = new Reporters();
   private final MetricRegistryFactory factory;
 
   private final RefCountingMap<MetricRegistryInfo, RatisMetricRegistry> registries;
@@ -60,27 +86,12 @@ public class MetricRegistriesImpl extends MetricRegistries {
 
   @Override
   public RatisMetricRegistry create(MetricRegistryInfo info) {
-    return registries.put(info, () -> {
-      if (reporterRegistrations.isEmpty()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("First MetricRegistry has been created without registering reporters. " +
-              "Hence registering JMX reporter by default.");
-        }
-        enableJmxReporter();
-      }
-      RatisMetricRegistry registry = factory.create(info);
-      reporterRegistrations.forEach(reg -> reg.accept(registry));
-      return registry;
-    });
+    return registries.put(info, () -> reporters.create(info));
   }
 
   @Override
   public boolean remove(MetricRegistryInfo key) {
-    RatisMetricRegistry registry = registries.get(key);
-    if (registry != null) {
-      stopReporters.forEach(reg -> reg.accept(registry));
-    }
-
+    get(key).ifPresent(reporters::remove);
     return registries.remove(key) == null;
   }
 
@@ -111,12 +122,12 @@ public class MetricRegistriesImpl extends MetricRegistries {
       LOG.warn("New reporters are added after registries were created. Some metrics will be missing from the reporter. "
           + "Please add reporter before adding any new registry.");
     }
-    this.reporterRegistrations.add(reporterRegistration);
-    this.stopReporters.add(stopReporter);
+    reporters.add(reporterRegistration, stopReporter);
   }
 
   @Override
   public void enableJmxReporter() {
+    System.out.println("enableJmxReporter");
     addReporterRegistration(
         MetricsReporting.jmxReporter(),
         MetricsReporting.stopJmxReporter());

ArrayList With synchronized may be better maintained than CopyOnWriteArrayList, I well try this

xichen01 avatar Sep 15 '22 12:09 xichen01

Hi @xichen01, could you rebase this patch against the master branch once you got time? It seems there is some conflicts now.

ChenSammi avatar Jul 28 '23 06:07 ChenSammi