grpc-kotlin icon indicating copy to clipboard operation
grpc-kotlin copied to clipboard

Retry does not work with coroutines

Open Cassianokunsch opened this issue 3 years ago • 3 comments

Hello, I made a gRPC service, and I'm trying to do my client retry the call when an especific error is throw, but the implementation in the service with courotines does not work, but with Java works very fine.

Server code with coroutines:

class NotificationGrpcService(private val notificationService: NotificationService) :
    NotificationGrpcKt.NotificationCoroutineImplBase() {

    var retryCounter = AtomicInteger(0)

    private val log = LoggerFactory.getLogger(this.javaClass)
    private val UNAVAILABLE_PERCENTAGE = 0.9f
    private val random = Random()

    override suspend fun sendPush(request: SendPushNotificationRequest): SendPushNotificationResponse {
        log.info("Received a call on method sendPushNotification with payload -> $request")

        val count: Int = retryCounter.incrementAndGet()
        if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
            log.info("Returning stubbed UNAVAILABLE error. count: $count")
            throw StatusException(Status.UNAVAILABLE.withDescription("Method notification.Notification.SendPush is unavailable"))
        } else {
            log.info("Returning successful Hello response, count: $count")

            return SendPushNotificationResponse.newBuilder().setMessage("success").build()
        }
    }
}

Client

public class RetryingHelloWorldClient {
    static final String ENV_DISABLE_RETRYING = "DISABLE_RETRYING_IN_RETRYING_EXAMPLE";

    private static final Logger logger = Logger.getLogger(RetryingHelloWorldClient.class.getName());

    private final boolean enableRetries;
    private final ManagedChannel channel;
    private final GreeterGrpc.GreeterBlockingStub blockingStub;
    private final NotificationGrpc.NotificationBlockingStub blockingStubNotification;
    private final AtomicInteger totalRpcs = new AtomicInteger();
    private final AtomicInteger failedRpcs = new AtomicInteger();

    protected Map<String, ?> getRetryingServiceConfig() {
        return new Gson()
                .fromJson(
                        new JsonReader(
                                new InputStreamReader(
                                        RetryingHelloWorldClient.class.getResourceAsStream(
                                                "retrying_service_config.json"),
                                        UTF_8)),
                        Map.class);
    }

    /**
     * Construct client connecting to HelloWorld server at {@code host:port}.
     */
    public RetryingHelloWorldClient(String host, int port, boolean enableRetries) {

        ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port)
                // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
                // needing certificates.
                .usePlaintext();
        if (enableRetries) {
            Map<String, ?> serviceConfig = getRetryingServiceConfig();
            logger.info("Client started with retrying configuration: " + serviceConfig);
            channelBuilder.defaultServiceConfig(serviceConfig).enableRetry();
        }
        channel = channelBuilder.build();
        blockingStub = GreeterGrpc.newBlockingStub(channel);
        blockingStubNotification = NotificationGrpc.newBlockingStub(channel);
        this.enableRetries = enableRetries;
    }

    public void push() {
        SendPushNotificationRequest request = SendPushNotificationRequest.newBuilder().setCustomerId(UUID.randomUUID().toString()).setMessage("sdasd").setTitle("asdad").build();
        SendPushNotificationResponse response = null;
        StatusRuntimeException statusRuntimeException = null;
        try {
            response = blockingStubNotification.sendPush(request);
        } catch (StatusRuntimeException e) {
            failedRpcs.incrementAndGet();
            statusRuntimeException = e;
        }

        totalRpcs.incrementAndGet();

        if (statusRuntimeException == null) {
            logger.log(Level.INFO, "Greeting: {0}", new Object[]{response.getMessage()});
        } else {
            logger.log(Level.INFO, "RPC failed: {0}", new Object[]{statusRuntimeException.getStatus()});
        }
    }

    public static void main(String[] args) throws Exception {
        boolean enableRetries = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_RETRYING));
      final RetryingHelloWorldClient client = new RetryingHelloWorldClient("localhost", 50051, enableRetries);
      
      client.push();

    }
}

Retry Police config

{
  "methodConfig": [
    {
      "name": [
        {
          "service": "notification.Notification",
          "method": "SendPush"
        }
      ],

      "retryPolicy": {
        "maxAttempts": 5,
        "initialBackoff": "0.5s",
        "maxBackoff": "30s",
        "backoffMultiplier": 2,
        "retryableStatusCodes": [
          "UNAVAILABLE", "UNIMPLEMENTED"
        ]
      }
    }
  ]
}

Server code without coroutines

class NotificationGrpcService(private val notificationService: NotificationService) :
    NotificationGrpc.NotificationImplBase() {

    var retryCounter = AtomicInteger(0)

    private val log = LoggerFactory.getLogger(this.javaClass)
    private val UNAVAILABLE_PERCENTAGE = 0.9f
    private val random = Random()

    override fun sendPush(
        request: SendPushNotificationRequest?,
        responseObserver: StreamObserver<SendPushNotificationResponse>?
    ) {
        log.info("Received a call on method sendPushNotification with payload -> $request")

        val count: Int = retryCounter.incrementAndGet()
        if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
            log.info("Returning stubbed UNAVAILABLE error. count: $count")
            responseObserver!!.onError(Status.UNAVAILABLE.withDescription("Method notification.Notification.SendPush is unavailable").asRuntimeException())
        } else {
            log.info("Returning successful Hello response, count: $count")
            responseObserver!!.onNext(SendPushNotificationResponse.newBuilder().setMessage("success").build())
            return responseObserver.onCompleted()
        }
    }
}
```
 

Cassianokunsch avatar Jul 23 '21 12:07 Cassianokunsch

Hello! Any updates here?

UDarya avatar Jan 04 '22 08:01 UDarya

What exactly doesn't work? Do you get an error? Something else?

lowasser avatar Jan 25 '22 22:01 lowasser

@lowasser I am also having this problem. I put my case and examples #334 Thank you for checking it out.

sangyongchoi avatar Mar 27 '22 12:03 sangyongchoi