mypy-protobuf icon indicating copy to clipboard operation
mypy-protobuf copied to clipboard

A better way to annotation the AsyncStub

Open RuofengX opened this issue 1 year ago • 12 comments

I am writing async code.

The proto file is like these:

service Continuum{
  rpc Tick(TickRequest) returns (HistoryResult);
}

The pyi file is partially like these:

class ContinuumStub:
    def __init__(self, channel: typing.Union[grpc.Channel, grpc.aio.Channel]) -> None: ...
    Tick: grpc.UnaryUnaryMultiCallable[
        essence.service_pb2.TickRequest,
        essence.service_pb2.HistoryResult,
    ]

class ContinuumAsyncStub:
    Tick: grpc.aio.UnaryUnaryMultiCallable[
        essence.service_pb2.TickRequest,
        essence.service_pb2.HistoryResult,
    ]

The ContinuumStub has 2 different params for its init function, which are Channel and aio.Channel. When using aio.Channel, the Stub instance need a type:ignore or typing.cast to convert to AsyncStub, like:

self._channel =grpc.aio.insecure_channel(
            target=self.target,
        )
stub: service_grpc.ContinuumAsyncStub = service_grpc.ContinuumStub(
    self._channel
)  # type:ignore

typing.cast is not working here because there isn't a true AsyncStub here, AsyncStub only exists in pyi file.

To solve this problem, maybe using Generic or overload the Tick function in pyi file? Any idea?

RuofengX avatar Jul 22 '23 01:07 RuofengX

Like these:

class ContinuumStub:
    def __init__(self, channel: typing.Union[grpc.Channel, grpc.aio.Channel]) -> None: ...
    Tick: grpc.UnaryUnaryMultiCallable[
        essence.service_pb2.TickRequest,
        essence.service_pb2.HistoryResult,
    ] | grpc.aio.UnaryUnaryMultiCallable[
        essence.service_pb2.TickRequest,
        essence.service_pb2.HistoryResult,
    ]

But it didn't seem like the best practise, because user could use a await SyncStub.Tick(), it should raise a typing error. Is there a way to overload the whole class?

RuofengX avatar Jul 22 '23 01:07 RuofengX

Faced same problems with async support.

If I try to run the code that imports my MyServiceAsyncStub python fails with AttributeError: module 'my_pb2_grpc' has no attribute 'MyServiceAsyncStub'.

The only workaround is to defined custom factory function with type string and add type ignore for return statement.

def create_my_stub(channel: grpc.aio.Channel) -> "my_pb2_grpc.MyServiceAsyncStub":
    return my_pb2_grpc.MyServiceStub(channel)  # type: ignore[return-value]

Then use this factory function in your code with normal mypy support.

P.S. I think that mypy protobuf plugin should generate one function that may return either sync stub or async stub according to channel type in input.


class _GreeterServiceSyncStub:
    Greet: grpc.UnaryUnaryMultiCallable[
        my_pb2.GreetRequest,
        my_pb2.GreetResponse,
    ]


class _GreeterServiceAsyncStub:
    Greet: grpc.aio.UnaryUnaryMultiCallable[
        my_pb2.GreetRequest,
        my_pb2.GreetResponse,
    ]


@typing.overload
def GreeterServiceStub(channel: grpc.Channel) -> _GreeterServiceSyncStub:
    ...


@typing.overload
def GreeterServiceStub(channel: grpc.aio.Channel) -> _GreeterServiceAsyncStub:
    ...

zerlok avatar Apr 15 '24 16:04 zerlok

seems reasonable! Would take an improvement as a PR if you can get it working.

nipunn1313 avatar Apr 23 '24 17:04 nipunn1313

I tried the above out here: https://github.com/artificialinc/mypy-protobuf/tree/aidan/async-stub-overload

It didn't really work. As far as I can tell you can't overload classes, and without the proper runtime definition of that function, it falls apart in certain ways.

It works in the simple test case, but because it isn't typed as a class things get weird when passing the type around. And so you end up with an object that is a class, typed as a function.

artificial-aidan avatar May 07 '24 21:05 artificial-aidan

But I did try something else that seems promising. Using the above example, this is what gets generated:

_MTVContinuum0 = typing.TypeVar(
    '_MTVContinuum0',
    grpc.UnaryUnaryMultiCallable[
        testproto.grpc.dummy_pb2.TickRequest,
        testproto.grpc.dummy_pb2.HistoryResult,
    ],
    grpc.aio.UnaryUnaryMultiCallable[
        testproto.grpc.dummy_pb2.TickRequest,
        testproto.grpc.dummy_pb2.HistoryResult,
    ],
)

class ContinuumStub(typing.Generic[_MTVContinuum0]):
    @typing.overload
    def __init__(self: ContinuumStub[
        grpc.UnaryUnaryMultiCallable[
            testproto.grpc.dummy_pb2.TickRequest,
            testproto.grpc.dummy_pb2.HistoryResult,
        ],
    ], channel: grpc.Channel) -> None: ...

    @typing.overload
    def __init__(self: ContinuumStub[
        grpc.aio.UnaryUnaryMultiCallable[
            testproto.grpc.dummy_pb2.TickRequest,
            testproto.grpc.dummy_pb2.HistoryResult,
        ],
    ], channel: grpc.aio.Channel) -> None: ...

    Tick: _MTVContinuum0

The overloaded init methods set the generic type on the stub. So when an asyncio channel is passed in, it sets the correct Type vars.

Pushed to the same branch. Will do some more testing

artificial-aidan avatar May 07 '24 23:05 artificial-aidan

Example of code for the DummyService

_MTVDummyService0 = typing.TypeVar(
    '_MTVDummyService0',
    grpc.UnaryUnaryMultiCallable[
        testproto.grpc.dummy_pb2.DummyRequest,
        testproto.grpc.dummy_pb2.DummyReply,
    ],
    grpc.aio.UnaryUnaryMultiCallable[
        testproto.grpc.dummy_pb2.DummyRequest,
        testproto.grpc.dummy_pb2.DummyReply,
    ],
)

_MTVDummyService1 = typing.TypeVar(
    '_MTVDummyService1',
    grpc.UnaryStreamMultiCallable[
        testproto.grpc.dummy_pb2.DummyRequest,
        testproto.grpc.dummy_pb2.DummyReply,
    ],
    grpc.aio.UnaryStreamMultiCallable[
        testproto.grpc.dummy_pb2.DummyRequest,
        testproto.grpc.dummy_pb2.DummyReply,
    ],
)

_MTVDummyService2 = typing.TypeVar(
    '_MTVDummyService2',
    grpc.StreamUnaryMultiCallable[
        testproto.grpc.dummy_pb2.DummyRequest,
        testproto.grpc.dummy_pb2.DummyReply,
    ],
    grpc.aio.StreamUnaryMultiCallable[
        testproto.grpc.dummy_pb2.DummyRequest,
        testproto.grpc.dummy_pb2.DummyReply,
    ],
)

_MTVDummyService3 = typing.TypeVar(
    '_MTVDummyService3',
    grpc.StreamStreamMultiCallable[
        testproto.grpc.dummy_pb2.DummyRequest,
        testproto.grpc.dummy_pb2.DummyReply,
    ],
    grpc.aio.StreamStreamMultiCallable[
        testproto.grpc.dummy_pb2.DummyRequest,
        testproto.grpc.dummy_pb2.DummyReply,
    ],
)

class DummyServiceStub(typing.Generic[_MTVDummyService0,_MTVDummyService1,_MTVDummyService2,_MTVDummyService3]):
    """DummyService"""

    @typing.overload
    def __init__(self: DummyServiceStub[
        grpc.UnaryUnaryMultiCallable[
            testproto.grpc.dummy_pb2.DummyRequest,
            testproto.grpc.dummy_pb2.DummyReply,
        ],
        grpc.UnaryStreamMultiCallable[
            testproto.grpc.dummy_pb2.DummyRequest,
            testproto.grpc.dummy_pb2.DummyReply,
        ],
        grpc.StreamUnaryMultiCallable[
            testproto.grpc.dummy_pb2.DummyRequest,
            testproto.grpc.dummy_pb2.DummyReply,
        ],
        grpc.StreamStreamMultiCallable[
            testproto.grpc.dummy_pb2.DummyRequest,
            testproto.grpc.dummy_pb2.DummyReply,
        ],
    ], channel: grpc.Channel) -> None: ...

    @typing.overload
    def __init__(self: DummyServiceStub[
        grpc.aio.UnaryUnaryMultiCallable[
            testproto.grpc.dummy_pb2.DummyRequest,
            testproto.grpc.dummy_pb2.DummyReply,
        ],
        grpc.aio.UnaryStreamMultiCallable[
            testproto.grpc.dummy_pb2.DummyRequest,
            testproto.grpc.dummy_pb2.DummyReply,
        ],
        grpc.aio.StreamUnaryMultiCallable[
            testproto.grpc.dummy_pb2.DummyRequest,
            testproto.grpc.dummy_pb2.DummyReply,
        ],
        grpc.aio.StreamStreamMultiCallable[
            testproto.grpc.dummy_pb2.DummyRequest,
            testproto.grpc.dummy_pb2.DummyReply,
        ],
    ], channel: grpc.aio.Channel) -> None: ...

    UnaryUnary: _MTVDummyService0
    """UnaryUnary"""

    UnaryStream: _MTVDummyService1
    """UnaryStream"""

    StreamUnary: _MTVDummyService2
    """StreamUnary"""

    StreamStream: _MTVDummyService3
    """StreamStream"""

artificial-aidan avatar May 07 '24 23:05 artificial-aidan

Bummer, this also falls apart when you are trying to use the type of the Stub.

stub: DummyServiceStub

reveal_type(stub.UnaryUnary)

^ type is Unknown

artificial-aidan avatar May 07 '24 23:05 artificial-aidan

Ok, deeper down the rabbit hole. Using TypeVar default parameter, and a type alias I think I've restored existing behavior when explicitly typing something, and when inferring types i've made an improvement

    class Test:
        stub: dummy_pb2_grpc.DummyServiceStub
        async_stub: dummy_pb2_grpc.DummyServiceAsyncStub

    t = Test()
    reveal_type(t.stub)
    # note: Revealed type is "testproto.grpc.dummy_pb2_grpc.DummyServiceStub[grpc.UnaryUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.UnaryStreamMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.StreamUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.StreamStreamMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply]]"
    reveal_type(t.stub.UnaryUnary)
    # note: Revealed type is "grpc.UnaryUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply]"
    reveal_type(t.async_stub)
    # note: Revealed type is "testproto.grpc.dummy_pb2_grpc.DummyServiceStub[grpc.aio.UnaryUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.aio.UnaryStreamMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.aio.StreamUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.aio.StreamStreamMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply]]"
    reveal_type(t.async_stub.UnaryUnary)
    #  note: Revealed type is "grpc.aio.UnaryUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply]"

    client = dummy_pb2_grpc.DummyServiceStub(grpc.insecure_channel(ADDRESS))
    reveal_type(client)
    # note: Revealed type is "testproto.grpc.dummy_pb2_grpc.DummyServiceStub[grpc.UnaryUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.UnaryStreamMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.StreamUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.StreamStreamMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply]]"

    reveal_type(client.UnaryUnary)
    # note: Revealed type is "grpc.UnaryUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply]"
    async_client = dummy_pb2_grpc.DummyServiceStub(grpc.aio.insecure_channel(ADDRESS))
    reveal_type(async_client)
    #  note: Revealed type is "testproto.grpc.dummy_pb2_grpc.DummyServiceStub[grpc.aio.UnaryUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.aio.UnaryStreamMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.aio.StreamUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply], grpc.aio.StreamStreamMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply]]"
    reveal_type(async_client.UnaryUnary)
    # note: Revealed type is "grpc.aio.UnaryUnaryMultiCallable[testproto.grpc.dummy_pb2.DummyRequest, testproto.grpc.dummy_pb2.DummyReply]"

artificial-aidan avatar May 08 '24 00:05 artificial-aidan

Ok, I think I actually have something that works.

The thing that caught me up was I had to quote the AsyncStub when using it to type in certain places.

    def __init__(self, stub: 'service_test_pb2_grpc.TestingAsyncStub', local_metadata: grpc.aio.Metadata):
        super().__init__(local_metadata)
        self._test_grpc_stub: 'service_test_pb2_grpc.TestingAsyncStub' = stub

artificial-aidan avatar May 08 '24 03:05 artificial-aidan

After some time I decided to implement my own stub generator https://github.com/zerlok/pyprotostuben . It allows you to choose to generate either async or sync stubs. There better support with HasField & WhichOneof + more clear required parameters in constructor.

zerlok avatar Aug 22 '24 22:08 zerlok

After some time I decided to implement my own stub generator zerlok/pyprotostuben . It allows you to choose to generate either async or sync stubs. There better support with HasField & WhichOneof + more clear required parameters in constructor.

I was about to suggest this as a solution, since most people will probably want to work either with sync or async stubs, but not both. Can we just have a protoc option to make the stub async or sync?

I would rather have an option in this project, which is trusted and widely maintained, than having to switch to some other stub gen.

llucax avatar Sep 05 '24 08:09 llucax

Here is a quick attempt at implementing this:

  • #631

I tried it out and seems to work well. If this solution would be accepted, I can finish up the PR, updating docs and adding tests. @nipunn1313

llucax avatar Sep 05 '24 09:09 llucax