purerpc icon indicating copy to clipboard operation
purerpc copied to clipboard

Broken Pipe on long running Service Stub

Open penguin138 opened this issue 6 years ago • 1 comments

When trying to call method of long running service, BrokenPipeError is raised due to the fact that connection that was used to send data was broken and wasn't restarted. For example, for the following service defined in my_service.proto:

syntax = "proto3"
service MyService {
    rpc MyMethod (InputMessage) returns (OutputMessage);
}

and the following python script my_script.py:

import curio
from purerpc import Channel
from my_service_grpc import MyServiceStub
from my_service_pb2 import InputMessage


async def main():
    channel = Channel("localhost", 8000)
    my_service_stub = MyServiceStub(channel)
    request =InputMessage()
    response = await my_service_stub.MyMethod(request)

if __name__ == "__main__":
    curio.run(main)

we have the following traceback:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/purerpc/server.py", line 132, in request_received
    await call_server_stream_stream(method_fn, stream)
  File "/usr/local/lib/python3.6/site-packages/purerpc/wrappers.py", line 74, in call_server_stream_stream
    await send_multiple_messages_server(stream, func(input_message_stream))
  File "/usr/local/lib/python3.6/site-packages/purerpc/wrappers.py", line 33, in send_multiple_messages_server
    async for message in tmp:
  File "/usr/local/lib/python3.6/site-packages/my_script.py", line 11, in MyMethod
    response = await my_service_stub.MyMethod(request)
  File "/usr/local/lib/python3.6/site-packages/purerpc/wrappers.py", line 84, in __call__
    stream = await self._stream_fn()
  File "/usr/local/lib/python3.6/site-packages/purerpc/client.py", line 35, in rpc
    self.channel.port))
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_proto.py", line 48, in start_request
    content_type_suffix if content_type_suffix else "+proto", custom_metadata
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_socket.py", line 231, in start_request
    await self._socket.flush()
  File "/usr/local/lib/python3.6/site-packages/purerpc/grpc_socket.py", line 32, in flush
    await self._socket.sendall(data)
  File "/usr/local/lib/python3.6/site-packages/curio/io.py", line 179, in sendall
    nsent = self._socket_send(buffer, flags)
BrokenPipeError: [Errno 32] Broken pipe

penguin138 avatar Jun 19 '18 13:06 penguin138

We need to properly detect underlying transport failures, and restart connection if there are any, but until then I added a small fix that enables SO_KEEPALIVE and sets TCP_KEEPINTVL and TCP_KEEPIDLE to smaller values, that should help cope with this issue.

standy66 avatar Jul 12 '18 03:07 standy66