grpc-kotlin
grpc-kotlin copied to clipboard
Cannot catch Exception from Server Stub
I am attempting to install a default Exception handler for our gRPC server stubs. I have tried using both a java gRPC interceptor and coroutine exception handlers, but nothing seems to work. The following code snippet below replicates the issue.
val handler = CoroutineExceptionHandler { _, ex ->
println("this never happens!!")
}
internal class TestGreeterGrpcService : GreeterGrpcKt.GreeterCoroutineImplBase(handler) {
override suspend fun sayHello(request: HelloRequest): HelloReply = throw RuntimeException("boom")
}
Using the debugger I traced the issue to here
https://github.com/grpc/grpc-kotlin/blob/master/stub/src/main/java/io/grpc/kotlin/ServerCalls.kt#L229
It seems like the CoroutineContext provided to the stub is not used in the Flow that dispatches requests.
I'm fairly new to coroutines so maybe I'm doing something wrong.
Have you read the documentation of CoroutineExceptionHandler
? It only ever applies under very specific circumstances.
yes I read the docs and was the original reason I was attempting to install the exception handler inside a gRPC interceptor. Maybe I should take a step back to explain what I am trying to do.
I don't want to wrap every gRPC service impl with a try-catch to log exceptions. Rather install an exception handler in an interceptor so it applies to every gRPC service. I originally tried doing it inside of a ServerInterceptor
but the Exception would never catch. After switching to grpc-java from grpc-kotlin, the ServerInterceptor
worked just fine.
I'm basically using a version of this https://github.com/grpc/grpc-java/issues/1552#issuecomment-219476882
I think you could use a SimpleForwardingServerCall
in the interceptor and override the close
method, checking for the status there. Fields you could look at are ofc the status.code
(which most likely will be UNKNOWN
if there was an exception in your server) and/or the status.cause
which would give you the actual exception.
Implement ServerInterceptor
with custom ServerCall.Listener
doesn't work. onHalfClose()
doesn't produce exception.
Exception only is catch here https://github.com/grpc/grpc-kotlin/blob/master/stub/src/main/java/io/grpc/kotlin/ServerCalls.kt#L233, but how to propagate this to global exception handler?
I got really stuck on this as well and was about to switch to grpc-java, but then saw how TransmitStatusRuntimeExceptionInterceptor
implemented a custom ServerCall
. I'm going to post my solution here to help anyone who comes across this in the future. There are some custom exceptions and extension functions (ex. getLogPrefix
for request and caller ID tracing) that I'm using in my project, but I'll leave them in just to give an idea of what I was getting at.
private val logger = KotlinLogging.logger {}
class LoggingAndExceptionTranslationServerInterceptor : ServerInterceptor {
private class ExceptionTranslatingServerCall<ReqT, RespT>(
delegate: ServerCall<ReqT, RespT>
) : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(delegate) {
override fun close(status: Status, trailers: Metadata) {
val newStatus = if (!status.isOk) {
val cause = status.cause
logger.error(logger.nullIfNotDebug(cause)) { "${getLogPrefix()}closing due to error" }
if (status.code == Status.Code.UNKNOWN) {
val newStatus = when (cause) {
is IllegalArgumentException -> Status.INVALID_ARGUMENT
is IllegalStateException -> Status.FAILED_PRECONDITION
is NotFoundException -> Status.NOT_FOUND
is ConflictException -> Status.ALREADY_EXISTS
is UnauthenticationException -> Status.UNAUTHENTICATED
is UnauthorizationException -> Status.PERMISSION_DENIED
else -> Status.UNKNOWN
}
newStatus.withDescription(cause?.message).withCause(cause)
} else
status
} else {
logger.trace { "${getLogPrefix()}closing" }
status
}
super.close(newStatus, trailers)
}
}
private class LoggingServerCallListener<ReqT>(
delegate: ServerCall.Listener<ReqT>
) : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) {
override fun onMessage(message: ReqT) {
logger.trace { "${getLogPrefix()}message: $message" }
try {
super.onMessage(message)
} catch (t: Throwable) {
if (logger.isDebugEnabled)
logger.debug(t) { "${getLogPrefix()}error on message: $message" }
else
logger.error { "${getLogPrefix()}error handling message" }
throw t
}
}
override fun onHalfClose() {
logger.trace { "${getLogPrefix()}half-close" }
try {
super.onHalfClose()
} catch (t: Throwable) {
logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling half-close" }
throw t
}
}
override fun onCancel() {
logger.trace { "${getLogPrefix()}cancel" }
try {
super.onCancel()
} catch (t: Throwable) {
logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling cancel" }
throw t
}
}
override fun onComplete() {
logger.trace { "${getLogPrefix()}complete" }
try {
super.onComplete()
} catch (t: Throwable) {
logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling complete" }
throw t
}
}
override fun onReady() {
logger.trace { "${getLogPrefix()}ready" }
try {
super.onReady()
} catch (t: Throwable) {
logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling ready" }
throw t
}
}
}
override fun <ReqT : Any, RespT : Any> interceptCall(
call: ServerCall<ReqT, RespT>,
metadata: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> =
LoggingServerCallListener(next.startCall(ExceptionTranslatingServerCall(call), metadata))
}
@jonpeterson Thanks for sharing that! I was trying to use onHalfClose
etc and finding they did nothing.
In the implementation you shared, is the LoggingServerCallListener
superfluous? It's what you'd want in grpc-java
, right, but exceptions never show up in it in grpc-kotlin
? So I think this would suffice:
package com.yourproject
import io.grpc.ForwardingServerCall
import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor
import io.grpc.Status
import mu.KotlinLogging
import javax.inject.Singleton
private val logger = KotlinLogging.logger {}
/**
* Log all exceptions thrown from gRPC endpoints, and adjust Status for known exceptions.
*/
@Singleton
class ExceptionInterceptor : ServerInterceptor {
/**
* When closing a gRPC call, extract any error status information to top-level fields. Also
* log the cause of errors.
*/
private class ExceptionTranslatingServerCall<ReqT, RespT>(
delegate: ServerCall<ReqT, RespT>
) : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(delegate) {
override fun close(status: Status, trailers: Metadata) {
if (status.isOk) {
return super.close(status, trailers)
}
val cause = status.cause
var newStatus = status
logger.error(cause) { "Error handling gRPC endpoint." }
if (status.code == Status.Code.UNKNOWN) {
val translatedStatus = when (cause) {
is IllegalArgumentException -> Status.INVALID_ARGUMENT
is IllegalStateException -> Status.FAILED_PRECONDITION
else -> Status.UNKNOWN
}
newStatus = translatedStatus.withDescription(cause?.message).withCause(cause)
}
super.close(newStatus, trailers)
}
}
override fun <ReqT : Any, RespT : Any> interceptCall(
call: ServerCall<ReqT, RespT>,
headers: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> {
return next.startCall(ExceptionTranslatingServerCall(call), headers)
}
}