opentelemetry-python-contrib icon indicating copy to clipboard operation
opentelemetry-python-contrib copied to clipboard

Cancelled Error does not Propagate through Aio Client Interceptors

Open tonyhong007 opened this issue 9 months ago • 0 comments
trafficstars

Describe your environment

OS: Ubuntu Python version: Python 3.10.16 Package version: opentelemetry-instrumentation-grpc 0.50b0

What happened?

I am observing that when I create a gRPC channel with the aio client interceptors, make a call to an rpc using asyncio.create_task(), and subsequently call .cancel() on the task, I am unable to except the asyncio.CancelledError that should be propagated to the rpc. I notice this issue occurs on unary unary and stream unary connections.

Steps to Reproduce

I have created toy scripts for both the unary unary and stream unary cases to demonstrate this issue.

example.proto

syntax = "proto3";

package example;

service ExampleService {
  rpc SayHello (HelloRequest) returns (HelloReply);

  rpc SayWhatsUp(stream WhatsUpRequest) returns (WhatsUpReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

message WhatsUpRequest {
  string name = 1;
}

message WhatsUpReply {
  string message = 1;
}

unary_unary_client.py

import asyncio
import grpc
from opentelemetry.instrumentation.grpc import aio_client_interceptors

from example_pb2 import HelloRequest
from example_pb2_grpc import ExampleServiceStub

async def make_hello_rpc_call(stub):
    response = await stub.SayHello(HelloRequest(name="TestUser"))
    print("Server Response:", response.message)

async def main():
    # Create gRPC channel with aio_client_interceptors
    interceptors = aio_client_interceptors()
    async with grpc.aio.insecure_channel("localhost:50051", interceptors=interceptors) as channel:
        # Test unary unary
        stub = ExampleServiceStub(channel)
        rpc_task = asyncio.create_task(make_hello_rpc_call(stub))

        await asyncio.sleep(4)
        rpc_task.cancel()

        try:
            await rpc_task
        except asyncio.CancelledError:
            print("RPC task cancelled.")

        await asyncio.sleep(100000)

if __name__ == "__main__":
    asyncio.run(main())

unary_unary_server.py

import asyncio
import grpc
from example_pb2 import HelloReply
from example_pb2_grpc import ExampleServiceServicer, add_ExampleServiceServicer_to_server

class ExampleService(ExampleServiceServicer):
    async def SayHello(self, request, context):
        print(f"Received request from: {request.name}")

        try:
            print("Waiting for event...")
            await asyncio.Event().wait()
        except asyncio.CancelledError:
            print("Request was cancelled")
            raise

        print("Event received, sending response...")

        return HelloReply(message=f"Hello, {request.name}!")

async def serve():
    server = grpc.aio.server()
    add_ExampleServiceServicer_to_server(ExampleService(), server)
    server.add_insecure_port("[::]:50051")
    print("gRPC Server running on port 50051...")
    await server.start()
    await server.wait_for_termination()

if __name__ == "__main__":
    asyncio.run(serve())

stream_unary_client.py

import asyncio
import grpc
from opentelemetry.instrumentation.grpc import aio_client_interceptors

from example_pb2 import WhatsUpRequest
from example_pb2_grpc import ExampleServiceStub

async def make_whats_up_rpc_call(stub):
    async def request_stream():
        for name in ["Alice", "Bob", "Charlie"]:
            print(f"Sending request for {name}")
            yield WhatsUpRequest(name=name)

    response = await stub.SayWhatsUp(request_stream())
    print("Server Response:", response.message)

async def main():
    # Create gRPC channel with aio_client_interceptors
    interceptors = aio_client_interceptors()
    async with grpc.aio.insecure_channel("localhost:50051", interceptors=interceptors) as channel:
        # Test stream unary
        stub = ExampleServiceStub(channel)
        rpc_task = asyncio.create_task(make_whats_up_rpc_call(stub))

        await asyncio.sleep(4)
        print("Canceling the RPC call...")
        rpc_task.cancel()

        try:
            await rpc_task
        except asyncio.CancelledError:
            print("RPC task cancelled.")

        await asyncio.sleep(100000)

if __name__ == "__main__":
    asyncio.run(main())

stream_unary_server.py

import asyncio
import grpc
from example_pb2 import WhatsUpReply
from example_pb2_grpc import ExampleServiceServicer, add_ExampleServiceServicer_to_server

class ExampleService(ExampleServiceServicer):
    async def SayWhatsUp(self, request_iterator, context):
        try:
            names = []
            async for request in request_iterator:
                print(f"Received request from: {request.name}")
                names.append(request.name)

            print("Waiting for event...")
            await asyncio.Event().wait()
        except asyncio.CancelledError:
            print("Request was cancelled")
            raise

        print("All requests received, sending response...")
        return WhatsUpReply(message=f"Whats up, {', '.join(names)}!")

async def serve():
    server = grpc.aio.server()
    add_ExampleServiceServicer_to_server(ExampleService(), server)
    server.add_insecure_port("[::]:50051")
    print("gRPC Server running on port 50051...")
    await server.start()
    await server.wait_for_termination()

if __name__ == "__main__":
    asyncio.run(serve())

Expected Result

For the unary unary case, I expect to see these logs from the server:

gRPC Server running on port 50051...
Received request from: TestUser
Waiting for event...
Request was cancelled

For the stream unary case, I expect to see these logs from the server:

gRPC Server running on port 50051...
Received request from: Alice
Received request from: Bob
Received request from: Charlie
Waiting for event...
Request was cancelled

Actual Result

For the unary unary case, these are the logs I see from the server since the cancelled error does not propagate:

gRPC Server running on port 50051...
Received request from: TestUser
Waiting for event...

For the stream unary case, these are the logs I see from the server since the cancelled error does not propagate:

gRPC Server running on port 50051...
Received request from: Alice
Received request from: Bob
Received request from: Charlie
Waiting for event...

Additional context

I have found a workaround for both the unary unary and stream unary cases. The workaround is adding a dummy interceptor after the aio client interceptors to explicitly await the call. I have attached the updated client code with the workaround below.

unary_unary_client.py

import asyncio
import grpc
from opentelemetry.instrumentation.grpc import aio_client_interceptors

from example_pb2 import HelloRequest
from example_pb2_grpc import ExampleServiceStub

async def make_hello_rpc_call(stub):
    response = await stub.SayHello(HelloRequest(name="TestUser"))
    print("Server Response:", response.message)

class UnaryUnaryClientInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
    async def intercept_unary_unary(
        self, continuation, client_call_details, request
    ):
        print(f"Intercepting unary-unary call to {client_call_details.method}")
        try:
            # Proceed with the original call
            call = await continuation(client_call_details, request)

            response = await call
            print(
                f"Received response from {client_call_details.method}: {response}"
            )
            return call
        except asyncio.CancelledError:
            print(f"RPC call to {client_call_details.method} was cancelled.")
            raise

async def main():
    # Create gRPC channel with aio_client_interceptors
    interceptors = aio_client_interceptors() + [UnaryUnaryClientInterceptor()]
    async with grpc.aio.insecure_channel("localhost:50051", interceptors=interceptors) as channel:
        # Test unary unary
        stub = ExampleServiceStub(channel)
        rpc_task = asyncio.create_task(make_hello_rpc_call(stub))

        await asyncio.sleep(4)
        rpc_task.cancel()

        try:
            await rpc_task
        except asyncio.CancelledError:
            print("RPC task cancelled.")

        await asyncio.sleep(100000)

if __name__ == "__main__":
    asyncio.run(main())

stream_unary_client.py

import asyncio
import grpc
from opentelemetry.instrumentation.grpc import aio_client_interceptors

from example_pb2 import WhatsUpRequest
from example_pb2_grpc import ExampleServiceStub

async def make_whats_up_rpc_call(stub):
    async def request_stream():
        for name in ["Alice", "Bob", "Charlie"]:
            print(f"Sending request for {name}")
            yield WhatsUpRequest(name=name)

    response = await stub.SayWhatsUp(request_stream())
    print("Server Response:", response.message)

class StreamUnaryClientInterceptor(grpc.aio.StreamUnaryClientInterceptor):
    async def intercept_stream_unary(
        self, continuation, client_call_details, request_iterator
    ):
        print(f"Intercepting stream-unary call to {client_call_details.method}")

        async def new_request_iterator():
            async for request in request_iterator:
                print(f"Sending request: {request.name}")
                yield request  # Forward the request

        try:
            # Proceed with the original call
            call = await continuation(client_call_details, new_request_iterator())

            response = await call

            print(f"Received response from {client_call_details.method}: {response}")
            return call
        except asyncio.CancelledError:
            print(f"RPC call to {client_call_details.method} was cancelled.")
            raise

async def main():
    # Create gRPC channel with aio_client_interceptors
    interceptors = aio_client_interceptors() + [StreamUnaryClientInterceptor()]
    async with grpc.aio.insecure_channel("localhost:50051", interceptors=interceptors) as channel:
        # Test stream unary
        stub = ExampleServiceStub(channel)
        rpc_task = asyncio.create_task(make_whats_up_rpc_call(stub))

        await asyncio.sleep(4)
        print("Canceling the RPC call...")
        rpc_task.cancel()

        try:
            await rpc_task
        except asyncio.CancelledError:
            print("RPC task cancelled.")

        await asyncio.sleep(100000)

if __name__ == "__main__":
    asyncio.run(main())

Would you like to implement a fix?

None

tonyhong007 avatar Feb 17 '25 01:02 tonyhong007