grpc-kotlin icon indicating copy to clipboard operation
grpc-kotlin copied to clipboard

Add support for CoroutineContextClientInterceptor

Open trietsch opened this issue 3 years ago • 5 comments
trafficstars

A CoroutineContextClientInterceptor would help out in e.g. setting specific Metadata while using the Coroutine Context. With the ongoing discussion whether or not Metadata should be allowed to be set for every individual gRPC call (https://github.com/grpc/grpc-kotlin/issues/47 vs https://github.com/grpc/grpc-kotlin/issues/224), another alternative (specifically for Authorization, tracing, etc) is missing: an interceptor that has access to and operates in the current coroutine scope.

trietsch avatar Sep 06 '22 12:09 trietsch

Not the prettiest of solutions, but if you want to do this, it's possible using Kotlin Reflection. My goal was to get a value from the metadata of an incoming call, and forward the same value under the same metadata key in the same outgoing call (the app in question was an API Gateway, hence it proxied all requests as is, with some checks).

import io.grpc.*
import io.grpc.kotlin.CoroutineContextServerInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.reflect.full.memberProperties
import io.grpc.Context as GrpcContext

/**
 * This interceptor is used to forward the [CoroutineContext] that is set by a [CoroutineContextServerInterceptor].
 * **ONLY** use this if the [CoroutineContext.Key] for "grpc-kotlin-coroutine-context" has been set in the [CoroutineContextServerInterceptor].
 */
abstract class CoroutineContextClientInterceptor : ClientInterceptor {
    private val grpcContextKey: GrpcContext.Key<CoroutineContext> = getGrpcContextFromServerInterceptor()

    /**
     * We need to get the exact same instance that is used by the CoroutineContextServerInterceptor,
     * Otherwise we cannot get the [CoroutineContext] from the [GrpcContext], as the [GrpcContext] uses reference equality.
     */
    private fun getGrpcContextFromServerInterceptor(): GrpcContext.Key<CoroutineContext> {
        val grpcContextFieldRef = CoroutineContextServerInterceptor.Companion::class.memberProperties.first { it.name == "COROUTINE_CONTEXT_KEY" }

        return grpcContextFieldRef.get(CoroutineContextServerInterceptor.Companion::class.objectInstance!!) as GrpcContext.Key<CoroutineContext>
    }

    abstract fun <ReqT, RespT> coroutineContextClientCall(coroutineContext: CoroutineContext, delegate: ClientCall<ReqT, RespT>): ForwardingClientCall<ReqT, RespT>

    /**
     * Get the [CoroutineContext] from the [GrpcContext] that is set by the [CoroutineContextServerInterceptor].
     */
    private fun getServerInterceptorCoroutineContext(): CoroutineContext = grpcContextKey[GrpcContext.current()]

    override fun <ReqT, RespT> interceptCall(
        method: MethodDescriptor<ReqT, RespT>,
        callOptions: CallOptions,
        next: Channel
    ): ClientCall<ReqT, RespT> {
        return coroutineContextClientCall(getServerInterceptorCoroutineContext(), next.newCall(method, callOptions))
    }
}

And an example implementation of the abstract class.

import io.grpc.ClientCall
import io.grpc.ForwardingClientCall
import io.grpc.Metadata
import io.strmprivacy.grpc.common.client.CoroutineContextClientInterceptor
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.slf4j.LoggerFactory
import kotlin.coroutines.CoroutineContext

/**
 * Takes a string from the CoroutineContext as it has been set in the receiving call, and adds it as a header to the outgoing call.
 */
class SomeClientInterceptor : CoroutineContextClientInterceptor() {
    override fun <ReqT, RespT> coroutineContextClientCall(coroutineContext: CoroutineContext, delegate: ClientCall<ReqT, RespT>): ForwardingClientCall<ReqT, RespT> {
        return SomeClientCall(coroutineContext, delegate)
    }

    class SomeClientCall<ReqT, RespT>(
        private val context: CoroutineContext,
        delegate: ClientCall<ReqT, RespT>
    ) : ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(delegate) {
        private val log by lazy { LoggerFactory.getLogger(javaClass) }

        override fun start(responseListener: Listener<RespT>, headers: Metadata) {
            runBlocking {
                withContext(context) {
                    log.info("Value: ${coroutineContext[SomeContext]?.value}")
                }
            }

            super.start(responseListener, headers)
        }
    }
}

class SomeContext(val value: String?) : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<SomeContext>

    object Constants {
        const val SOME_HEADER = "x-some-header"
        val metadataKey: Metadata.Key<String> = Metadata.Key.of(SOME_HEADER, Metadata.ASCII_STRING_MARSHALLER)
    }

    override val key: CoroutineContext.Key<SomeContext>
        get() = Key
}

trietsch avatar May 22 '23 10:05 trietsch

@trietsch Hey, what should I do if the app is not a gRPC server? In my case, the app is Spring WebFlux server and also a gRPC client. So, what I'm trying to do is using CoWebFilter to set coroutine context with the value from request headers and then take it out from GrpcClientInterceptor to propagate it to every gRPC call. I've done for the CoWebFilter side, but I'm stuck with the ClientInterceptor side(taking it out the coroutine context I've set). Could you give me some hint to achieve it?

+ Is it ok to use runBlocking in your code? I don't want to block the thread every time I do gRPC call.

sgc109 avatar Oct 30 '23 13:10 sgc109

@sgc109 what you'll need to do, is to set a key in the coroutine context that will be used in your client interceptor. You'll have to make sure that you keep a "global" coroutine context that contains the value you want to pass around. In the CoWebFilter you'll get the context and add the key with the corresponding value. In the client interceptor, you get the same context, and get the value by the CoroutineContext.Key. Just make sure that you use the exact same context.

Regarding use of runBlocking, that is afaik the only way to bridge blocking to non-blocking code in Kotlin, but I'm by no means a coroutine expert. As I wanted to set the coroutine context, I was required to bridge to non-blocking code, and therefore I decided to use runBlocking.

trietsch avatar Oct 30 '23 13:10 trietsch

@trietsch Thank you for your quick answer. What I'm stuck with is how I get coroutine context even if the method in ClientInteceptor is not a suspend function. You're passing CoroutineContext object itself through GrpcContext.Key, but what would be the best way without GrpcContext.Key?

sgc109 avatar Oct 30 '23 13:10 sgc109

@sgc109 try it first in it's simplest form to see if this could work: use a Kotlin object. A global singleton to easily use the same coroutine context.

trietsch avatar Oct 31 '23 09:10 trietsch