grpc-java icon indicating copy to clipboard operation
grpc-java copied to clipboard

Acquiring Throttle for retries failing with TSAN

Open ejona86 opened this issue 3 years ago • 0 comments

Not in syncContext: https://github.com/grpc/grpc-java/blob/c589441bded0cdf9e321f1d012fb52dd31c172b4/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java#L552

https://github.com/grpc/grpc-java/blob/c589441bded0cdf9e321f1d012fb52dd31c172b4/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java#L280-L282

I also wonder if the behavior is correct, because it seems we are keeping state (Throttle.tokenCount) within the service config, which means it is reset on config updates.

Note that this code is actually thread-safe for the JVM, because retryThrottling in final in ManagedChannelServiceConfig. TSAN doesn't observe that detail. Even so, we don't like to depend on that.

From http://sponge2/26ddebd6-6f90-4aac-90e6-121e02d35041 :

WARNING: ThreadSanitizer: data race (pid=5918)
  Read of size 4 at 0x0000cfa8e74c by thread T5 (mutexes: write M0, write M1, write M2):
    #0 io.grpc.internal.ManagedChannelImpl.access$1800(Lio/grpc/internal/ManagedChannelImpl;)Lio/grpc/internal/ManagedChannelServiceConfig; ManagedChannelImpl.java:118 
    #1 io.grpc.internal.ManagedChannelImpl$ChannelStreamProvider.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/CallOptions;Lio/grpc/Metadata;Lio/grpc/Context;)Lio/grpc/internal/ClientStream; ManagedChannelImpl.java:552 
    #2 io.grpc.internal.ClientCallImpl.startInternal(Lio/grpc/ClientCall$Listener;Lio/grpc/Metadata;)V ClientCallImpl.java:258 
    #3 io.grpc.internal.ClientCallImpl.start(Lio/grpc/ClientCall$Listener;Lio/grpc/Metadata;)V ClientCallImpl.java:191 
    #4 io.grpc.xds.XdsNameResolver$ConfigSelector$1ClusterSelectionInterceptor$1.start(Lio/grpc/ClientCall$Listener;Lio/grpc/Metadata;)V XdsNameResolver.java:527 
    #5 io.grpc.internal.ManagedChannelImpl$ConfigSelectingClientCall.start(Lio/grpc/ClientCall$Listener;Lio/grpc/Metadata;)V ManagedChannelImpl.java:1223 
    #6 io.grpc.internal.DelayedClientCall$2.run()V DelayedClientCall.java:185 
    #7 io.grpc.internal.DelayedClientCall.drainPendingCalls()V DelayedClientCall.java:283 
    #8 io.grpc.internal.DelayedClientCall.access$100(Lio/grpc/internal/DelayedClientCall;)V DelayedClientCall.java:50 
    #9 io.grpc.internal.DelayedClientCall$1.run()V DelayedClientCall.java:155 
    #10 io.grpc.internal.ManagedChannelImpl$RealChannel$PendingCall$1.run()V ManagedChannelImpl.java:1119 
    #11 io.grpc.stub.ClientCalls$ThreadlessExecutor.runQuietly(Ljava/lang/Runnable;)V ClientCalls.java:771 
    #12 io.grpc.stub.ClientCalls$ThreadlessExecutor.waitAndDrain()V ClientCalls.java:754 
    #13 io.grpc.stub.ClientCalls.blockingUnaryCall(Lio/grpc/Channel;Lio/grpc/MethodDescriptor;Lio/grpc/CallOptions;Ljava/lang/Object;)Ljava/lang/Object; ClientCalls.java:157 
    #14 io.grpc.testing.protobuf.SimpleServiceGrpc$SimpleServiceBlockingStub.unaryRpc(Lio/grpc/testing/protobuf/SimpleRequest;)Lio/grpc/testing/protobuf/SimpleResponse; SimpleServiceGrpc.java:316 
    #15 io.grpc.xds.FakeControlPlaneXdsIntegrationTest.pingPong()V FakeControlPlaneXdsIntegrationTest.java:212 

  Previous write of size 4 at 0x0000cfa8e74c by thread T23 (mutexes: write M3):
    #0 io.grpc.internal.ManagedChannelImpl.access$1802(Lio/grpc/internal/ManagedChannelImpl;Lio/grpc/internal/ManagedChannelServiceConfig;)Lio/grpc/internal/ManagedChannelServiceConfig; ManagedChannelImpl.java:118 
    #1 io.grpc.internal.ManagedChannelImpl$NameResolverListener$1NamesResolved.run()V ManagedChannelImpl.java:1838 
    #2 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:95 
    #3 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:127 
    #4 io.grpc.xds.XdsNameResolver$ResolveState$RouteDiscoveryState.onChanged(Lio/grpc/xds/XdsClient$RdsUpdate;)V XdsNameResolver.java:906 
    #5 io.grpc.xds.ClientXdsClient$ResourceSubscriber.notifyWatcher(Lio/grpc/xds/XdsClient$ResourceWatcher;Lio/grpc/xds/XdsClient$ResourceUpdate;)V ClientXdsClient.java:2601 
    #6 io.grpc.xds.ClientXdsClient$ResourceSubscriber.onData(Lio/grpc/xds/ClientXdsClient$ParsedResource;Ljava/lang/String;J)V ClientXdsClient.java:2552 
    #7 io.grpc.xds.ClientXdsClient.handleResourceUpdate(Lio/grpc/xds/Bootstrapper$ServerInfo;Lio/grpc/xds/AbstractXdsClient$ResourceType;Ljava/util/Map;Ljava/util/Set;Ljava/util/Set;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;)V ClientXdsClient.java:2369 
    #8 io.grpc.xds.ClientXdsClient.handleRdsResponse(Lio/grpc/xds/Bootstrapper$ServerInfo;Ljava/lang/String;Ljava/util/List;Ljava/lang/String;)V ClientXdsClient.java:1486 
    #9 io.grpc.xds.AbstractXdsClient$AbstractAdsStream.handleRpcResponse(Lio/grpc/xds/AbstractXdsClient$ResourceType;Ljava/lang/String;Ljava/util/List;Ljava/lang/String;)V AbstractXdsClient.java:432 
    #10 io.grpc.xds.AbstractXdsClient$AdsStreamV3$1$1.run()V AbstractXdsClient.java:598 
    #11 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:95 
    #12 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:127 
    #13 io.grpc.xds.AbstractXdsClient$AdsStreamV3$1.onNext(Lio/envoyproxy/envoy/service/discovery/v3/DiscoveryResponse;)V AbstractXdsClient.java:589 
    #14 io.grpc.xds.AbstractXdsClient$AdsStreamV3$1.onNext(Ljava/lang/Object;)V AbstractXdsClient.java:586 
    #15 io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(Ljava/lang/Object;)V ClientCalls.java:474 
    #16 io.grpc.ForwardingClientCallListener.onMessage(Ljava/lang/Object;)V ForwardingClientCallListener.java:33 
    #17 io.grpc.ForwardingClientCallListener.onMessage(Ljava/lang/Object;)V ForwardingClientCallListener.java:33 
    #18 io.grpc.internal.DelayedClientCall$DelayedListener.onMessage(Ljava/lang/Object;)V DelayedClientCall.java:452 
    #19 io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal()V ClientCallImpl.java:661 
    #20 io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext()V ClientCallImpl.java:646 
    #21 io.grpc.internal.ContextRunnable.run()V ContextRunnable.java:37 
    #22 io.grpc.internal.SerializingExecutor.run()V SerializingExecutor.java:133 
    #23 java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V ThreadPoolExecutor.java:1130 
    #24 java.util.concurrent.ThreadPoolExecutor$Worker.run()V ThreadPoolExecutor.java:630 
    #25 java.lang.Thread.run()V Thread.java:830 

ejona86 avatar Jun 13 '22 17:06 ejona86