grpc-kotlin
grpc-kotlin copied to clipboard
Retry does not work with coroutines
Hello, I made a gRPC service, and I'm trying to do my client retry the call when an especific error is throw, but the implementation in the service with courotines does not work, but with Java works very fine.
Server code with coroutines:
class NotificationGrpcService(private val notificationService: NotificationService) :
NotificationGrpcKt.NotificationCoroutineImplBase() {
var retryCounter = AtomicInteger(0)
private val log = LoggerFactory.getLogger(this.javaClass)
private val UNAVAILABLE_PERCENTAGE = 0.9f
private val random = Random()
override suspend fun sendPush(request: SendPushNotificationRequest): SendPushNotificationResponse {
log.info("Received a call on method sendPushNotification with payload -> $request")
val count: Int = retryCounter.incrementAndGet()
if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
log.info("Returning stubbed UNAVAILABLE error. count: $count")
throw StatusException(Status.UNAVAILABLE.withDescription("Method notification.Notification.SendPush is unavailable"))
} else {
log.info("Returning successful Hello response, count: $count")
return SendPushNotificationResponse.newBuilder().setMessage("success").build()
}
}
}
Client
public class RetryingHelloWorldClient {
static final String ENV_DISABLE_RETRYING = "DISABLE_RETRYING_IN_RETRYING_EXAMPLE";
private static final Logger logger = Logger.getLogger(RetryingHelloWorldClient.class.getName());
private final boolean enableRetries;
private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;
private final NotificationGrpc.NotificationBlockingStub blockingStubNotification;
private final AtomicInteger totalRpcs = new AtomicInteger();
private final AtomicInteger failedRpcs = new AtomicInteger();
protected Map<String, ?> getRetryingServiceConfig() {
return new Gson()
.fromJson(
new JsonReader(
new InputStreamReader(
RetryingHelloWorldClient.class.getResourceAsStream(
"retrying_service_config.json"),
UTF_8)),
Map.class);
}
/**
* Construct client connecting to HelloWorld server at {@code host:port}.
*/
public RetryingHelloWorldClient(String host, int port, boolean enableRetries) {
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port)
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
// needing certificates.
.usePlaintext();
if (enableRetries) {
Map<String, ?> serviceConfig = getRetryingServiceConfig();
logger.info("Client started with retrying configuration: " + serviceConfig);
channelBuilder.defaultServiceConfig(serviceConfig).enableRetry();
}
channel = channelBuilder.build();
blockingStub = GreeterGrpc.newBlockingStub(channel);
blockingStubNotification = NotificationGrpc.newBlockingStub(channel);
this.enableRetries = enableRetries;
}
public void push() {
SendPushNotificationRequest request = SendPushNotificationRequest.newBuilder().setCustomerId(UUID.randomUUID().toString()).setMessage("sdasd").setTitle("asdad").build();
SendPushNotificationResponse response = null;
StatusRuntimeException statusRuntimeException = null;
try {
response = blockingStubNotification.sendPush(request);
} catch (StatusRuntimeException e) {
failedRpcs.incrementAndGet();
statusRuntimeException = e;
}
totalRpcs.incrementAndGet();
if (statusRuntimeException == null) {
logger.log(Level.INFO, "Greeting: {0}", new Object[]{response.getMessage()});
} else {
logger.log(Level.INFO, "RPC failed: {0}", new Object[]{statusRuntimeException.getStatus()});
}
}
public static void main(String[] args) throws Exception {
boolean enableRetries = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_RETRYING));
final RetryingHelloWorldClient client = new RetryingHelloWorldClient("localhost", 50051, enableRetries);
client.push();
}
}
Retry Police config
{
"methodConfig": [
{
"name": [
{
"service": "notification.Notification",
"method": "SendPush"
}
],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE", "UNIMPLEMENTED"
]
}
}
]
}
Server code without coroutines
class NotificationGrpcService(private val notificationService: NotificationService) :
NotificationGrpc.NotificationImplBase() {
var retryCounter = AtomicInteger(0)
private val log = LoggerFactory.getLogger(this.javaClass)
private val UNAVAILABLE_PERCENTAGE = 0.9f
private val random = Random()
override fun sendPush(
request: SendPushNotificationRequest?,
responseObserver: StreamObserver<SendPushNotificationResponse>?
) {
log.info("Received a call on method sendPushNotification with payload -> $request")
val count: Int = retryCounter.incrementAndGet()
if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
log.info("Returning stubbed UNAVAILABLE error. count: $count")
responseObserver!!.onError(Status.UNAVAILABLE.withDescription("Method notification.Notification.SendPush is unavailable").asRuntimeException())
} else {
log.info("Returning successful Hello response, count: $count")
responseObserver!!.onNext(SendPushNotificationResponse.newBuilder().setMessage("success").build())
return responseObserver.onCompleted()
}
}
}
```
Hello! Any updates here?
What exactly doesn't work? Do you get an error? Something else?
@lowasser I am also having this problem. I put my case and examples #334 Thank you for checking it out.