trio icon indicating copy to clipboard operation
trio copied to clipboard

Add "one obvious way" for implementing the common multiplexed request/response pattern

Open njsmith opened this issue 6 years ago • 30 comments

Here's a very common pattern: we have a connection to some kind of peer. Multiple tasks send requests via the connection, and wait for responses. One background task reads from the connection, and dispatches the responses back to the appropriate requesting tasks.

Examples include WAMP (I think @agronholm ran into this), @sscherfke's aiomas channels, this SO question from @lilydjwg, gRPC, any HTTP client library that wants to support HTTP/2, etc.

It's certainly possible to do this kind of thing from Trio, but there are enough subtleties and it's a common enough pattern that I think it's worth working thinking it through once and working out the best solution, and then adding helpers to trio so it becomes The One Obvious Way To Do It. Also the docs should have an explicit tutorial about this. (Or maybe a tutorial will turn out to be sufficient.)

Some issues to consider:

I used to recommend that people directly use wait_task_rescheduled inside the await rpc_call(...) function, since conceptually it seems like exactly the right thing: it puts a task to sleep, waits for a single value/error to come back, and nudges the user to think about cancellation. BUT, in realistic cases this can create a race condition: after we send the request, it's theoretically possible for the response to come back before we can call wait_task_rescheduled, and then everything falls apart. The underlying problem here is that wait_task_rescheduled uses the task object itself as the key to wake it up again, and since tasks sleep many times over their lifetime, this creates an ambiguity where we can't tell which wakeup goes with with sleep unless we make sure that they're always in lockstep. So I guess one potential option would be to change the low-level sleep/wakeup APIs use a different key, one that's unique for each sleep? There's nothing terribly magic about the task object, but it does have the advantage that it's an existing object that we have to stash in thread-local storage anyway, whereas creating a unique key for each time probably adds a tiny bit more overhead. OTOH, maybe it doesn't matter...?

We don't really want something like Result.capture here, because we don't want to catch random exceptions that happen inside the reader task (especially things like Cancelled!!); we only want to create values/errors based on the data read from the remote peer. OTOH maybe taking a Result is fine and we just trust people to use the Error/Value constructors directly. Or since Result is in hazmat, maybe we want to (also) provide direct set_value/set_error methods so people don't have to know about it just to use this. [Edit: some of this doesn't apply anymore now that we've split off Result into its own library (renaming it to Outcome in the process).]

The "obvious" answer is a Future. But this isn't actually particularly closely matched to this use case: for example, a Future is multi-consumer, while in this case we know there's only one consumer. A Future is effectively an Event+Result; in some sense what we want here is something more like a single-use Queue+Result. Where this different particularly matters is cancellation: when we know there's only a single consumer, and it gets cancelled, that's different than if one of potentially multiple consumers gets cancelled.

Maybe we'll end up with a Future at the end of this after all, but I want to explore the larger space first, especially since we've all been brainwashed into seeing Futures as the solution to everything and it's sometimes hard to tell whether when it's actually true and when it's when-all-you-have-is-a-hammer syndrome. Also, maybe Futures are just the wrong level of abstraction entirely. Maybe we should be thinking of a piece of reusable code to capture this pattern and take care of all the plumbing, sort of like serve_listeners encapsulates a common pattern.

In general, cancellation is a big issue here.

In fact, cancellation is a problem even before we worry about handling the response. The most straightforward way to implement a generic RPC mechanism would be to have an API like await conn.run_remote(fn, *args), and then run_remote encodes the request, sends it on the underlying stream, and then reads the response.

On the requesting side, if you just do the straightforward thing like:

async def run_remote(self, fn, *args):
    async with self._send_lock:
        request_id = next(self._request_counter)
        request_data = self._encode_request(request_id, fn, *args)
        await self._stream.send_all(request_data)
    return await self._somehow_get_response(request_id)

Then you're already in trouble with respect to cancellation: if you get cancelled in the middle of send_all, then you might have sent only half your request, and now instead of cancelling a single request on this connection, you've just corrupted the connection entirely. That's no good at all.

One solution would be to use shielding, though it's still tricky: you do want the send_all to be cancellable in some cases (e.g. if the user hits control-C). Just not if the only thing being cancelled is this particular request. I guess you could use shielding, + make the receive loop close the stream if its cancelled, and then #460 will kick in and cause send_all to exit early? Maybe you want all those other pieces anyway?

When the receive loop is cancelled, you also want to immediately resolve all outstanding requests with some kind of ConnectionLost error.

Alternatively, we could add a second background task, for our send loop; by assumption, we already have a background task for our receive loop, so adding a second task doesn't change our user-visible API. Then our request function could be implemented as something like:

async def run_remote(fn, *args):
    request_id = next(self._request_counter)
    request_data = self._encode_request(request_id, fn, *args)
    await self._send_queue.put(request_data)
    return await self._somehow_get_response(request_id)

This nicely removes the lock and the original cancellation hazard: the only things Queue.put can do are return normally or raise an error, and in the former case it's always succeeded and in the latter case it's always a no-op.

However, this particular formulation assumes that our protocol is composed of totally independent messages, where for each message we can assign it an id, encode it, and then drop or re-order it, independently of all the others. For many protocols, that's not true -- maybe ids are constrained to be sequential, or the messages contain a sequence number, so you need to make sure that the calls to encode happen in the right order, and once you've encoded the message then you're committed to sending it. In these cases you'd want to put the encoding logic inside the sender task / inside the shield. And in the sender task case, this means you either need some way to get the request_id back from the sender task, or else the run_remote function needs to create an object representing the response and hand it off to the sender task, like:

async def run_remote(fn, *args):
    response = OneShotFuture()
    await self._send_queue.put((response, fn, args))
    return response.get()  # Still todo: cancellation handling here

OK. So we've actually sent our request. Now what happens if our request function gets cancelled? There are a few different semantics that might make sense.

If the operation is side-effect free, we might want to abandon it: let run_remote return immediately (raising Cancelled), and when/if a response does come back, throw it away. Alternatively, we might want to follow Trio's normal rules for cancellation, and wait for the response to come back regardless. (Since our normal rule is that cancelled means that the operation was actually cancelled and didn't happen, or at least didn't happen completely.)

Then there's a second, orthogonal question: do we somehow notify the remote peer of the cancellation? For example, WAMP's RPC system has a "cancel that RPC request" message, and if our remote peer is also using trio then it could straightforwardly convert this back into a trio cancellation on the remote side. But there are other protocols don't have this feature.

I say these are orthogonal because they form a 2x2 square of plausible behaviors: you could abandon + don't notify, you could abandon + send a cancel request (as a courtesy -- you'll throw the response away in any case, but maybe the other side can save some processing), you could wait + don't notify (sorry, you're just stuck waiting, similar to run_sync_in_worker_thread), or you could wait + notify. In the last two cases, if a peer is totally locked-up and not responding, then you can still escape by closing the connection entirely / cancelling the nursery that's holding the receive task, but timeouts on individual calls will be ineffective.

This is all very tricky: you want timeouts to be reliably enforced without being at the whims of a remote peer, and you also want to know whether a cancelled call actually happened or not, and in a distributed system these are incompatible – when everything is working normally you can have both, but after a partition you have to accept that some requests are in an indeterminate state. So at some level we just have to expose this fundamental reality rather than try to fix it, but it's tricky to do. Maybe this should be configurable on individual calls? Maybe you need two timeouts, one for the "send a cancel" behavior and one for the "give up entirely" behavior? Maybe the "give up entirely" is more of a per-connection feature than a per-request feature, so you might want per-call soft cancellation, and then an underlying heartbeat timer at the connection level where if there's no data for X seconds then the connection closes? I notice I've started talking about these as "hard" and "soft" cancellation – maybe there's some connection to #147? And should there be a different error for "successful cancel, confirmed by the remote peer" versus "couldn't talk to the remote peer, I have no idea what happened"?

We can implement all these different options, but it may be more or less awkward depending on what overall architecture we use.

Yet another complication: Streaming, as in HTTP/2 downloads or gRPC's streaming RPC features: maybe this isn't really different and can be essentially modeled as a request/response, b/c handling backpressure safety means that streams have to wait until the receiver opens its window (i.e., requests more data)? Of course in many cases you'll want some buffering, which basically means sending off the next request optimistically before we actually need it.

Question: how do the implementation patterns for queue protocols like AMQP or Kafka compare to this multiplexed request/response pattern?

njsmith avatar Mar 14 '18 08:03 njsmith