grpc-java
grpc-java copied to clipboard
Acquiring Throttle for retries failing with TSAN
Not in syncContext: https://github.com/grpc/grpc-java/blob/c589441bded0cdf9e321f1d012fb52dd31c172b4/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java#L552
https://github.com/grpc/grpc-java/blob/c589441bded0cdf9e321f1d012fb52dd31c172b4/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java#L280-L282
I also wonder if the behavior is correct, because it seems we are keeping state (Throttle.tokenCount) within the service config, which means it is reset on config updates.
Note that this code is actually thread-safe for the JVM, because retryThrottling in final in ManagedChannelServiceConfig. TSAN doesn't observe that detail. Even so, we don't like to depend on that.
From http://sponge2/26ddebd6-6f90-4aac-90e6-121e02d35041 :
WARNING: ThreadSanitizer: data race (pid=5918)
Read of size 4 at 0x0000cfa8e74c by thread T5 (mutexes: write M0, write M1, write M2):
#0 io.grpc.internal.ManagedChannelImpl.access$1800(Lio/grpc/internal/ManagedChannelImpl;)Lio/grpc/internal/ManagedChannelServiceConfig; ManagedChannelImpl.java:118
#1 io.grpc.internal.ManagedChannelImpl$ChannelStreamProvider.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/CallOptions;Lio/grpc/Metadata;Lio/grpc/Context;)Lio/grpc/internal/ClientStream; ManagedChannelImpl.java:552
#2 io.grpc.internal.ClientCallImpl.startInternal(Lio/grpc/ClientCall$Listener;Lio/grpc/Metadata;)V ClientCallImpl.java:258
#3 io.grpc.internal.ClientCallImpl.start(Lio/grpc/ClientCall$Listener;Lio/grpc/Metadata;)V ClientCallImpl.java:191
#4 io.grpc.xds.XdsNameResolver$ConfigSelector$1ClusterSelectionInterceptor$1.start(Lio/grpc/ClientCall$Listener;Lio/grpc/Metadata;)V XdsNameResolver.java:527
#5 io.grpc.internal.ManagedChannelImpl$ConfigSelectingClientCall.start(Lio/grpc/ClientCall$Listener;Lio/grpc/Metadata;)V ManagedChannelImpl.java:1223
#6 io.grpc.internal.DelayedClientCall$2.run()V DelayedClientCall.java:185
#7 io.grpc.internal.DelayedClientCall.drainPendingCalls()V DelayedClientCall.java:283
#8 io.grpc.internal.DelayedClientCall.access$100(Lio/grpc/internal/DelayedClientCall;)V DelayedClientCall.java:50
#9 io.grpc.internal.DelayedClientCall$1.run()V DelayedClientCall.java:155
#10 io.grpc.internal.ManagedChannelImpl$RealChannel$PendingCall$1.run()V ManagedChannelImpl.java:1119
#11 io.grpc.stub.ClientCalls$ThreadlessExecutor.runQuietly(Ljava/lang/Runnable;)V ClientCalls.java:771
#12 io.grpc.stub.ClientCalls$ThreadlessExecutor.waitAndDrain()V ClientCalls.java:754
#13 io.grpc.stub.ClientCalls.blockingUnaryCall(Lio/grpc/Channel;Lio/grpc/MethodDescriptor;Lio/grpc/CallOptions;Ljava/lang/Object;)Ljava/lang/Object; ClientCalls.java:157
#14 io.grpc.testing.protobuf.SimpleServiceGrpc$SimpleServiceBlockingStub.unaryRpc(Lio/grpc/testing/protobuf/SimpleRequest;)Lio/grpc/testing/protobuf/SimpleResponse; SimpleServiceGrpc.java:316
#15 io.grpc.xds.FakeControlPlaneXdsIntegrationTest.pingPong()V FakeControlPlaneXdsIntegrationTest.java:212
Previous write of size 4 at 0x0000cfa8e74c by thread T23 (mutexes: write M3):
#0 io.grpc.internal.ManagedChannelImpl.access$1802(Lio/grpc/internal/ManagedChannelImpl;Lio/grpc/internal/ManagedChannelServiceConfig;)Lio/grpc/internal/ManagedChannelServiceConfig; ManagedChannelImpl.java:118
#1 io.grpc.internal.ManagedChannelImpl$NameResolverListener$1NamesResolved.run()V ManagedChannelImpl.java:1838
#2 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:95
#3 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:127
#4 io.grpc.xds.XdsNameResolver$ResolveState$RouteDiscoveryState.onChanged(Lio/grpc/xds/XdsClient$RdsUpdate;)V XdsNameResolver.java:906
#5 io.grpc.xds.ClientXdsClient$ResourceSubscriber.notifyWatcher(Lio/grpc/xds/XdsClient$ResourceWatcher;Lio/grpc/xds/XdsClient$ResourceUpdate;)V ClientXdsClient.java:2601
#6 io.grpc.xds.ClientXdsClient$ResourceSubscriber.onData(Lio/grpc/xds/ClientXdsClient$ParsedResource;Ljava/lang/String;J)V ClientXdsClient.java:2552
#7 io.grpc.xds.ClientXdsClient.handleResourceUpdate(Lio/grpc/xds/Bootstrapper$ServerInfo;Lio/grpc/xds/AbstractXdsClient$ResourceType;Ljava/util/Map;Ljava/util/Set;Ljava/util/Set;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;)V ClientXdsClient.java:2369
#8 io.grpc.xds.ClientXdsClient.handleRdsResponse(Lio/grpc/xds/Bootstrapper$ServerInfo;Ljava/lang/String;Ljava/util/List;Ljava/lang/String;)V ClientXdsClient.java:1486
#9 io.grpc.xds.AbstractXdsClient$AbstractAdsStream.handleRpcResponse(Lio/grpc/xds/AbstractXdsClient$ResourceType;Ljava/lang/String;Ljava/util/List;Ljava/lang/String;)V AbstractXdsClient.java:432
#10 io.grpc.xds.AbstractXdsClient$AdsStreamV3$1$1.run()V AbstractXdsClient.java:598
#11 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:95
#12 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:127
#13 io.grpc.xds.AbstractXdsClient$AdsStreamV3$1.onNext(Lio/envoyproxy/envoy/service/discovery/v3/DiscoveryResponse;)V AbstractXdsClient.java:589
#14 io.grpc.xds.AbstractXdsClient$AdsStreamV3$1.onNext(Ljava/lang/Object;)V AbstractXdsClient.java:586
#15 io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(Ljava/lang/Object;)V ClientCalls.java:474
#16 io.grpc.ForwardingClientCallListener.onMessage(Ljava/lang/Object;)V ForwardingClientCallListener.java:33
#17 io.grpc.ForwardingClientCallListener.onMessage(Ljava/lang/Object;)V ForwardingClientCallListener.java:33
#18 io.grpc.internal.DelayedClientCall$DelayedListener.onMessage(Ljava/lang/Object;)V DelayedClientCall.java:452
#19 io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal()V ClientCallImpl.java:661
#20 io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext()V ClientCallImpl.java:646
#21 io.grpc.internal.ContextRunnable.run()V ContextRunnable.java:37
#22 io.grpc.internal.SerializingExecutor.run()V SerializingExecutor.java:133
#23 java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V ThreadPoolExecutor.java:1130
#24 java.util.concurrent.ThreadPoolExecutor$Worker.run()V ThreadPoolExecutor.java:630
#25 java.lang.Thread.run()V Thread.java:830