grpc-kotlin icon indicating copy to clipboard operation
grpc-kotlin copied to clipboard

Cannot catch Exception from Server Stub

Open ryanhall07 opened this issue 4 years ago • 6 comments

I am attempting to install a default Exception handler for our gRPC server stubs. I have tried using both a java gRPC interceptor and coroutine exception handlers, but nothing seems to work. The following code snippet below replicates the issue.

val handler = CoroutineExceptionHandler { _, ex ->
  println("this never happens!!")
}

internal class TestGreeterGrpcService : GreeterGrpcKt.GreeterCoroutineImplBase(handler) {
  override suspend fun sayHello(request: HelloRequest): HelloReply = throw RuntimeException("boom")
}

Using the debugger I traced the issue to here

https://github.com/grpc/grpc-kotlin/blob/master/stub/src/main/java/io/grpc/kotlin/ServerCalls.kt#L229

It seems like the CoroutineContext provided to the stub is not used in the Flow that dispatches requests.

I'm fairly new to coroutines so maybe I'm doing something wrong.

ryanhall07 avatar Jul 10 '20 20:07 ryanhall07

Have you read the documentation of CoroutineExceptionHandler? It only ever applies under very specific circumstances.

lowasser avatar Jul 14 '20 19:07 lowasser

yes I read the docs and was the original reason I was attempting to install the exception handler inside a gRPC interceptor. Maybe I should take a step back to explain what I am trying to do.

I don't want to wrap every gRPC service impl with a try-catch to log exceptions. Rather install an exception handler in an interceptor so it applies to every gRPC service. I originally tried doing it inside of a ServerInterceptor but the Exception would never catch. After switching to grpc-java from grpc-kotlin, the ServerInterceptor worked just fine.

I'm basically using a version of this https://github.com/grpc/grpc-java/issues/1552#issuecomment-219476882

ryanhall07 avatar Jul 14 '20 23:07 ryanhall07

I think you could use a SimpleForwardingServerCall in the interceptor and override the close method, checking for the status there. Fields you could look at are ofc the status.code (which most likely will be UNKNOWN if there was an exception in your server) and/or the status.cause which would give you the actual exception.

skomp avatar Jul 24 '20 07:07 skomp

Implement ServerInterceptor with custom ServerCall.Listener doesn't work. onHalfClose() doesn't produce exception.
Exception only is catch here https://github.com/grpc/grpc-kotlin/blob/master/stub/src/main/java/io/grpc/kotlin/ServerCalls.kt#L233, but how to propagate this to global exception handler?

quckly avatar Aug 14 '20 12:08 quckly

I got really stuck on this as well and was about to switch to grpc-java, but then saw how TransmitStatusRuntimeExceptionInterceptor implemented a custom ServerCall. I'm going to post my solution here to help anyone who comes across this in the future. There are some custom exceptions and extension functions (ex. getLogPrefix for request and caller ID tracing) that I'm using in my project, but I'll leave them in just to give an idea of what I was getting at.

private val logger = KotlinLogging.logger {}

class LoggingAndExceptionTranslationServerInterceptor : ServerInterceptor {

    private class ExceptionTranslatingServerCall<ReqT, RespT>(
        delegate: ServerCall<ReqT, RespT>
    ) : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(delegate) {

        override fun close(status: Status, trailers: Metadata) {
            val newStatus = if (!status.isOk) {
                val cause = status.cause
                logger.error(logger.nullIfNotDebug(cause)) { "${getLogPrefix()}closing due to error" }

                if (status.code == Status.Code.UNKNOWN) {
                    val newStatus = when (cause) {
                        is IllegalArgumentException -> Status.INVALID_ARGUMENT
                        is IllegalStateException -> Status.FAILED_PRECONDITION
                        is NotFoundException -> Status.NOT_FOUND
                        is ConflictException -> Status.ALREADY_EXISTS
                        is UnauthenticationException -> Status.UNAUTHENTICATED
                        is UnauthorizationException -> Status.PERMISSION_DENIED
                        else -> Status.UNKNOWN
                    }
                    newStatus.withDescription(cause?.message).withCause(cause)
                } else
                    status
            } else {
                logger.trace { "${getLogPrefix()}closing" }
                status
            }

            super.close(newStatus, trailers)
        }
    }

    private class LoggingServerCallListener<ReqT>(
        delegate: ServerCall.Listener<ReqT>
    ) : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) {

        override fun onMessage(message: ReqT) {
            logger.trace { "${getLogPrefix()}message: $message" }
            try {
                super.onMessage(message)
            } catch (t: Throwable) {
                if (logger.isDebugEnabled)
                    logger.debug(t) { "${getLogPrefix()}error on message: $message" }
                else
                    logger.error { "${getLogPrefix()}error handling message" }
                throw t
            }
        }

        override fun onHalfClose() {
            logger.trace { "${getLogPrefix()}half-close" }
            try {
                super.onHalfClose()
            } catch (t: Throwable) {
                logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling half-close" }
                throw t
            }
        }

        override fun onCancel() {
            logger.trace { "${getLogPrefix()}cancel" }
            try {
                super.onCancel()
            } catch (t: Throwable) {
                logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling cancel" }
                throw t
            }
        }

        override fun onComplete() {
            logger.trace { "${getLogPrefix()}complete" }
            try {
                super.onComplete()
            } catch (t: Throwable) {
                logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling complete" }
                throw t
            }
        }

        override fun onReady() {
            logger.trace { "${getLogPrefix()}ready" }
            try {
                super.onReady()
            } catch (t: Throwable) {
                logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling ready" }
                throw t
            }
        }
    }

    override fun <ReqT : Any, RespT : Any> interceptCall(
        call: ServerCall<ReqT, RespT>,
        metadata: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> =
        LoggingServerCallListener(next.startCall(ExceptionTranslatingServerCall(call), metadata))
}

jonpeterson avatar Sep 22 '20 00:09 jonpeterson

@jonpeterson Thanks for sharing that! I was trying to use onHalfClose etc and finding they did nothing.

In the implementation you shared, is the LoggingServerCallListener superfluous? It's what you'd want in grpc-java, right, but exceptions never show up in it in grpc-kotlin? So I think this would suffice:

package com.yourproject

import io.grpc.ForwardingServerCall
import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor
import io.grpc.Status
import mu.KotlinLogging
import javax.inject.Singleton

private val logger = KotlinLogging.logger {}

/**
 * Log all exceptions thrown from gRPC endpoints, and adjust Status for known exceptions.
 */
@Singleton
class ExceptionInterceptor : ServerInterceptor {

    /**
     * When closing a gRPC call, extract any error status information to top-level fields. Also
     * log the cause of errors.
     */
    private class ExceptionTranslatingServerCall<ReqT, RespT>(
        delegate: ServerCall<ReqT, RespT>
    ) : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(delegate) {

        override fun close(status: Status, trailers: Metadata) {
            if (status.isOk) {
                return super.close(status, trailers)
            }
            val cause = status.cause
            var newStatus = status

            logger.error(cause) { "Error handling gRPC endpoint." }

            if (status.code == Status.Code.UNKNOWN) {
                val translatedStatus = when (cause) {
                    is IllegalArgumentException -> Status.INVALID_ARGUMENT
                    is IllegalStateException -> Status.FAILED_PRECONDITION
                    else -> Status.UNKNOWN
                }
                newStatus = translatedStatus.withDescription(cause?.message).withCause(cause)
            }

            super.close(newStatus, trailers)
        }
    }

    override fun <ReqT : Any, RespT : Any> interceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> {
        return next.startCall(ExceptionTranslatingServerCall(call), headers)
    }
}

mfickett avatar Nov 13 '20 15:11 mfickett