opentelemetry-python-contrib
opentelemetry-python-contrib copied to clipboard
Cancelled Error does not Propagate through Aio Client Interceptors
Describe your environment
OS: Ubuntu Python version: Python 3.10.16 Package version: opentelemetry-instrumentation-grpc 0.50b0
What happened?
I am observing that when I create a gRPC channel with the aio client interceptors, make a call to an rpc using asyncio.create_task(), and subsequently call .cancel() on the task, I am unable to except the asyncio.CancelledError that should be propagated to the rpc. I notice this issue occurs on unary unary and stream unary connections.
Steps to Reproduce
I have created toy scripts for both the unary unary and stream unary cases to demonstrate this issue.
example.proto
syntax = "proto3";
package example;
service ExampleService {
rpc SayHello (HelloRequest) returns (HelloReply);
rpc SayWhatsUp(stream WhatsUpRequest) returns (WhatsUpReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
message WhatsUpRequest {
string name = 1;
}
message WhatsUpReply {
string message = 1;
}
unary_unary_client.py
import asyncio
import grpc
from opentelemetry.instrumentation.grpc import aio_client_interceptors
from example_pb2 import HelloRequest
from example_pb2_grpc import ExampleServiceStub
async def make_hello_rpc_call(stub):
response = await stub.SayHello(HelloRequest(name="TestUser"))
print("Server Response:", response.message)
async def main():
# Create gRPC channel with aio_client_interceptors
interceptors = aio_client_interceptors()
async with grpc.aio.insecure_channel("localhost:50051", interceptors=interceptors) as channel:
# Test unary unary
stub = ExampleServiceStub(channel)
rpc_task = asyncio.create_task(make_hello_rpc_call(stub))
await asyncio.sleep(4)
rpc_task.cancel()
try:
await rpc_task
except asyncio.CancelledError:
print("RPC task cancelled.")
await asyncio.sleep(100000)
if __name__ == "__main__":
asyncio.run(main())
unary_unary_server.py
import asyncio
import grpc
from example_pb2 import HelloReply
from example_pb2_grpc import ExampleServiceServicer, add_ExampleServiceServicer_to_server
class ExampleService(ExampleServiceServicer):
async def SayHello(self, request, context):
print(f"Received request from: {request.name}")
try:
print("Waiting for event...")
await asyncio.Event().wait()
except asyncio.CancelledError:
print("Request was cancelled")
raise
print("Event received, sending response...")
return HelloReply(message=f"Hello, {request.name}!")
async def serve():
server = grpc.aio.server()
add_ExampleServiceServicer_to_server(ExampleService(), server)
server.add_insecure_port("[::]:50051")
print("gRPC Server running on port 50051...")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
stream_unary_client.py
import asyncio
import grpc
from opentelemetry.instrumentation.grpc import aio_client_interceptors
from example_pb2 import WhatsUpRequest
from example_pb2_grpc import ExampleServiceStub
async def make_whats_up_rpc_call(stub):
async def request_stream():
for name in ["Alice", "Bob", "Charlie"]:
print(f"Sending request for {name}")
yield WhatsUpRequest(name=name)
response = await stub.SayWhatsUp(request_stream())
print("Server Response:", response.message)
async def main():
# Create gRPC channel with aio_client_interceptors
interceptors = aio_client_interceptors()
async with grpc.aio.insecure_channel("localhost:50051", interceptors=interceptors) as channel:
# Test stream unary
stub = ExampleServiceStub(channel)
rpc_task = asyncio.create_task(make_whats_up_rpc_call(stub))
await asyncio.sleep(4)
print("Canceling the RPC call...")
rpc_task.cancel()
try:
await rpc_task
except asyncio.CancelledError:
print("RPC task cancelled.")
await asyncio.sleep(100000)
if __name__ == "__main__":
asyncio.run(main())
stream_unary_server.py
import asyncio
import grpc
from example_pb2 import WhatsUpReply
from example_pb2_grpc import ExampleServiceServicer, add_ExampleServiceServicer_to_server
class ExampleService(ExampleServiceServicer):
async def SayWhatsUp(self, request_iterator, context):
try:
names = []
async for request in request_iterator:
print(f"Received request from: {request.name}")
names.append(request.name)
print("Waiting for event...")
await asyncio.Event().wait()
except asyncio.CancelledError:
print("Request was cancelled")
raise
print("All requests received, sending response...")
return WhatsUpReply(message=f"Whats up, {', '.join(names)}!")
async def serve():
server = grpc.aio.server()
add_ExampleServiceServicer_to_server(ExampleService(), server)
server.add_insecure_port("[::]:50051")
print("gRPC Server running on port 50051...")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
Expected Result
For the unary unary case, I expect to see these logs from the server:
gRPC Server running on port 50051...
Received request from: TestUser
Waiting for event...
Request was cancelled
For the stream unary case, I expect to see these logs from the server:
gRPC Server running on port 50051...
Received request from: Alice
Received request from: Bob
Received request from: Charlie
Waiting for event...
Request was cancelled
Actual Result
For the unary unary case, these are the logs I see from the server since the cancelled error does not propagate:
gRPC Server running on port 50051...
Received request from: TestUser
Waiting for event...
For the stream unary case, these are the logs I see from the server since the cancelled error does not propagate:
gRPC Server running on port 50051...
Received request from: Alice
Received request from: Bob
Received request from: Charlie
Waiting for event...
Additional context
I have found a workaround for both the unary unary and stream unary cases. The workaround is adding a dummy interceptor after the aio client interceptors to explicitly await the call. I have attached the updated client code with the workaround below.
unary_unary_client.py
import asyncio
import grpc
from opentelemetry.instrumentation.grpc import aio_client_interceptors
from example_pb2 import HelloRequest
from example_pb2_grpc import ExampleServiceStub
async def make_hello_rpc_call(stub):
response = await stub.SayHello(HelloRequest(name="TestUser"))
print("Server Response:", response.message)
class UnaryUnaryClientInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
async def intercept_unary_unary(
self, continuation, client_call_details, request
):
print(f"Intercepting unary-unary call to {client_call_details.method}")
try:
# Proceed with the original call
call = await continuation(client_call_details, request)
response = await call
print(
f"Received response from {client_call_details.method}: {response}"
)
return call
except asyncio.CancelledError:
print(f"RPC call to {client_call_details.method} was cancelled.")
raise
async def main():
# Create gRPC channel with aio_client_interceptors
interceptors = aio_client_interceptors() + [UnaryUnaryClientInterceptor()]
async with grpc.aio.insecure_channel("localhost:50051", interceptors=interceptors) as channel:
# Test unary unary
stub = ExampleServiceStub(channel)
rpc_task = asyncio.create_task(make_hello_rpc_call(stub))
await asyncio.sleep(4)
rpc_task.cancel()
try:
await rpc_task
except asyncio.CancelledError:
print("RPC task cancelled.")
await asyncio.sleep(100000)
if __name__ == "__main__":
asyncio.run(main())
stream_unary_client.py
import asyncio
import grpc
from opentelemetry.instrumentation.grpc import aio_client_interceptors
from example_pb2 import WhatsUpRequest
from example_pb2_grpc import ExampleServiceStub
async def make_whats_up_rpc_call(stub):
async def request_stream():
for name in ["Alice", "Bob", "Charlie"]:
print(f"Sending request for {name}")
yield WhatsUpRequest(name=name)
response = await stub.SayWhatsUp(request_stream())
print("Server Response:", response.message)
class StreamUnaryClientInterceptor(grpc.aio.StreamUnaryClientInterceptor):
async def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
print(f"Intercepting stream-unary call to {client_call_details.method}")
async def new_request_iterator():
async for request in request_iterator:
print(f"Sending request: {request.name}")
yield request # Forward the request
try:
# Proceed with the original call
call = await continuation(client_call_details, new_request_iterator())
response = await call
print(f"Received response from {client_call_details.method}: {response}")
return call
except asyncio.CancelledError:
print(f"RPC call to {client_call_details.method} was cancelled.")
raise
async def main():
# Create gRPC channel with aio_client_interceptors
interceptors = aio_client_interceptors() + [StreamUnaryClientInterceptor()]
async with grpc.aio.insecure_channel("localhost:50051", interceptors=interceptors) as channel:
# Test stream unary
stub = ExampleServiceStub(channel)
rpc_task = asyncio.create_task(make_whats_up_rpc_call(stub))
await asyncio.sleep(4)
print("Canceling the RPC call...")
rpc_task.cancel()
try:
await rpc_task
except asyncio.CancelledError:
print("RPC task cancelled.")
await asyncio.sleep(100000)
if __name__ == "__main__":
asyncio.run(main())
Would you like to implement a fix?
None