tower icon indicating copy to clipboard operation
tower copied to clipboard

Middleware for creating services from worker tasks

Open saleemrashid opened this issue 4 years ago • 3 comments

Summary

Tower should include middleware for creating services from worker tasks. This would be implemented similarly to Buffer, but the worker task would be user-provided. Buffer could be re-implemented on top of this new middleware.

Motivation

This is a fairly common use-case that would be better served with a single, robust implementation included with Tower. For example, tokio_tower implements this for adapting Tokio streams/sinks to Tower services. Its codebase would be considerably smaller if this functionality were provided by Tower, as it would only have to include two small worker task implementations (one for pipeline, one for multiplex).

Beyond that, this functionality is necessary for any protocol where unsolicited messages have to be handled. For example Zebra implements the Bitcoin protocol using Tower. It has a service for making requests to the peer, backed by a worker task which handles incoming messages from the peer and distinguishes between responses to outstanding requests and unsolicited requests from the peer.

In my own application, I am implementing a number of protocols that work very similarly. Additionally, I have had to implement a "BufferedRetry" middleware, which retries requests while maintaining request order (i.e. a request is repeatedly retried before sending the next request - whereas a Buffer<Retry<S>> requires S: Clone and would send all the requests and retry them as and when they fail, not maintaining the order they were requested in when retrying them). This is essentially a Buffer with the Retry logic included in the worker task.

Design

Below is a skeleton of what the WorkerTask trait could look like. I have no strong opinion on this, I am including it to kickstart the discussion.

trait WorkerTask {
    type Request;
    type Response;
    type Error;
    // The type sent from the service to the task. This will include the `Request`
    // and a `oneshot::Sender` to return the response.
    type Message;
    // The `Future` for the entire worker task. I am not decided what
    // `Future::Output` should be, see below for discussion on error handling.
    type Future: Future;

    fn execute(self, rx: mpsc::Receiver<Self::Message>) -> Self::Future;
}

Custom Futures

The approach must consider that the worker task might not produce Result<Response, Error>. All the examples given above do return Result<Response, Error>, except for Buffer - the worker task in Buffer returns a Future<Output = Result<Response, Error>> from the inner service, which must be driven to completion by the ResponseFuture.

One approach could be to receive a Future<Output = Result<Response, Error>> from the worker task. If the response is available immediately (i.e. as in all the given examples), futures::future::ready could be used. Whereas, for Buffer, it would simply return the the Future from the inner service.

Another approach could be, rather than providing a full service implementation, the facilities for implementing poll_ready and handling errors from the worker task should be provided by Tower but the implementation of call should be left to the user.

Allowing the user to implement call themselves might be preferable: for example, many implementations will want to pass a tracing::Span to the worker task and we should empower the user to be able to do this without forcing it on other users who don't need/want this.

Error Handling

We need to decide how to handle errors when the worker tasks terminates. Since the error will be sent to all future poll_ready calls and all outstanding requests, it will have to be Clone. We could use an Arc<E> to implement this :sparkles: automagically :sparkles:, but it should probably be left to the user to provide an error type that is Clone (they can use Arc to do so if they want). For reference, Buffer uses a custom error type, "ServiceError", which is effectively an Arc<BoxError> but this won't be appropriate for all use-cases.

If the WorkerTask::Future::Output is Result<(), E> then the worker task can use ? to terminate with an error, and we could propagate that error to poll_ready (using a similar approach to tower::buffer::worker::Handle or zebra_network::peer::error::ErrorSlot). However, if the worker task is processing a request, then the oneshot::Sender will be dropped when they return with ?. We could warn the user about this in documentation to ensure that they send the error to the request they're currently processing before using ?.

Finally, we need to decide how to handle the oneshot::Sender being dropped without a response. We could have an API contract that a WorkerTask must never do this (i.e. they should send a response to every request they take from the mpsc::Receiver, and then we send the error returned from WorkerTask::execute to all the outstanding requests). For reference, Buffer uses a custom Closed error when the oneshot::Sender is dropped without explanation. Meanwhile, Zebra's implementation panics in this scenario (as do my own implementations).

saleemrashid avatar Apr 27 '20 22:04 saleemrashid

This seems quite reasonable. I am curious if you could explain the zerbra and your use case?

also @jonhoo has done a lot of work on the buffer, I am curious what he thinks?

LucioFranco avatar May 01 '20 22:05 LucioFranco

I mean, the implementation of buffer is pretty straightforward; the code is long mostly due to comments and tracing. I'm not against including something like this, but it's also pretty straightforward to implement on your own. This caters specifically to things that a) cannot use Buffer, and b) fit exactly the API outlined in WorkerTask. I don't know how large that bucket of things is. I could easily see users whose needs exceed that of Buffer require even more fine-grained control (such as not wanting to use oneshot), in which case this new abstraction would sit mostly unused. I'm not even sure if tokio-tower could use it — I'd have to think more about that.

On a more technical level, I think a couple of changes are warranted. First, Request should probably be a generic parameter of the trait, rather than an associated type, to match Service. Second, execute should probably take a Stream rather than an mpsc::Receiver specifically. tokio-tower for example does not use mpsc, because it does not want to introduce buffering, and uses a custom single-element slot instead ("mediator").

For error handling, we cannot easily give a contract that WorkerTask must send on every Sender. That gets complicated on shutdown. I think we basically want to mirror Buffer exactly here, and then the caller can choose to panic if it gets Closed. As far as error propagation goes, yeah, it's a pain. The design in Buffer was what we arrived at after a lot of back and forth, but I'm certainly not married to the current design.

jonhoo avatar May 02 '20 14:05 jonhoo

I mean, the implementation of buffer is pretty straightforward; the code is long mostly due to comments and tracing. I'm not against including something like this, but it's also pretty straightforward to implement on your own. This caters specifically to things that a) cannot use Buffer, and b) fit exactly the API outlined in WorkerTask. I don't know how large that bucket of things is. I could easily see users whose needs exceed that of Buffer require even more fine-grained control (such as not wanting to use oneshot), in which case this new abstraction would sit mostly unused. I'm not even sure if tokio-tower could use it — I'd have to think more about that.

At a high-level, the idea of a worker task is fairly common:

  • For peer-to-peer protocols, you need to handle incoming messages from the peer and decide whether they are responses to requests you have sent, or requests from the peer. This is implemented as a worker task that waits on incoming messages from the peer or on outgoing requests from the Service. This is what Zebra does. There was also discussion on Discord where someone implemented this with tokio-tower by creating a tokio_tower::pipeline::server::Server that filters the messages and passes only the ones it categorizes as responses to a tokio_tower::pipeline::client::Client - this is probably not the most efficient method.

  • tokio-tower is similar to the peer-to-peer use-case, except all incoming messages are expected to be responses.

  • If you want to retry messages without changing the order, you need to wait until the previous request has successfully completed before sending the next. The simplest way to do this is to run a worker thread that takes a request, retries it to completion, then moves on to the next request.

  • On Discord, someone was describing a "batching" middleware (that would batch requests until a certain number is reached or until a timeout elapses). This would be implemented as a worker task that waits for the queue to fill or the timeout to elapse and then sends the request to the underlying service, returning the relevant part of the response to each individual request.

All of these have in common that their Service::call implementation sends requests to a long-running worker thread and then need to propagate errors from that worker thread back to the Service::call caller. It's straightforward to implement, but there's lots of small things you have to do each time and it's a pain to re-implement them constantly.

tokio-tower for example does not use mpsc, because it does not want to introduce buffering, and uses a custom single-element slot instead ("mediator").

This would be a useful primitive to have in general, I've been using mpsc::channel(1) to emulate an "unbuffered" message slot (I think Zebra does this too).

I think we basically want to mirror Buffer exactly here, and then the caller can choose to panic if it gets Closed. As far as error propagation goes, yeah, it's a pain. The design in Buffer was what we arrived at after a lot of back and forth, but I'm certainly not married to the current design.

I'm not opposed to this, but I would like to allow the user to be able to control Service::Error, rather than box the error (e.g. tower::buffer::Buffer wraps everything in ServiceError which is effectively an Arc<Box<dyn Error>>).

Instead of providing a full Service implementation, perhaps the error handling primitives would be more flexible. I could imagine a FusedService<E: Clone> (not a great name) that always returns the same error after it occurs once... maybe it could have a "handle" (i.e. an Arc<Mutex<Option<E>>>) that the worker task could use to set the error. There could be a matching Future implementation that wraps a oneshot::Receiver and returns the error if the oneshot::Sender is dropped.

saleemrashid avatar May 10 '20 13:05 saleemrashid