tk-listen icon indicating copy to clipboard operation
tk-listen copied to clipboard

Please provide a full example of select() usage for a clean shutdown

Open lu-zero opened this issue 6 years ago • 9 comments

I'm trying to follow the example in the documentation, it but I'm currently completely lost on the error message reported.

lu-zero avatar Nov 13 '18 16:11 lu-zero

What do you have in mind? Is it clean shutdown of the application, or is it just closing single bound socket? Can you show me the error message (is it compile-time or run-time error?)

tailhook avatar Nov 13 '18 22:11 tailhook

My, supposedly tiny, application should do something like this:

  1. listen on an address for a single connection
  2. once it receives the connection listen for as many connections as possible on another address
  3. once the master connection closes, drop all the secondary connections open and stop listening for more
  4. go back to 1
    let srv_prod = l_prod
        .incoming()
        .sleep_on_error(Duration::from_millis(100))
        .map(move |socket| {
            setup(socket, prod_state.clone(), true, buffer_size.clone());

            let l_cons = TcpListener::bind(&(cfg.output_host, cfg.port + 1).into()).unwrap();
            let cons_state = state.clone();

            let (tx, rx) = oneshot::channel();

            let srv_cons = l_cons
                .incoming()
                .sleep_on_error(Duration::from_millis(100))
                .map(move |socket| {
                    setup(socket, cons_state.clone(), false, buffer_size.clone());
                    Ok(())
                })
                .listen(1000)
                .select(rx.into());
            tokio::run(srv_cons);

            Ok(())
        })
        .listen(1);

    rt.spawn(srv_prod);

    rt.shutdown_on_idle().wait().unwrap();
error[E0271]: type mismatch resolving `<futures::Select<tk_listen::Listen<futures::stream::Map<tk_listen::SleepOnError<tokio::net::tcp::Incoming>, [closure@src/main.rs:264:22: 267:18 cons_state:_, buffer_size:_]>>, _> as futures::Future>::Error == ()`
   --> src/main.rs:270:13
    |
270 |             tokio::run(srv_cons);
    |             ^^^^^^^^^^ expected tuple, found ()
    |
    = note: expected type `((), futures::SelectNext<tk_listen::Listen<futures::stream::Map<tk_listen::SleepOnError<tokio::net::tcp::Incoming>, [closure@src/main.rs:264:22: 267:18 cons_state:_, buffer_size:_]>>, _>)`
               found type `()`
    = note: required by `tokio::run`

lu-zero avatar Nov 13 '18 22:11 lu-zero

Just convert the result on of the future:

tokio::run(srv_cons.map(|_| ()).map_err(|_| ()));

The idea is that .select() returns a tuple: the result of the one future and a wrapper type around the second future. In your case, it's fine to shut down (return value from a future) in any case, i.e. if either listening socket shuts down (which in turn never happens) or when shutdown channel received a message or closed.

It may be counter-intuitive that we also discard errors, but the listener doesn't propagate errors as far as I remember and channel errors are here only to close it when the other side is closed.

tailhook avatar Nov 15 '18 12:11 tailhook

Oh, by the way, do you use tokio::run which is blocking in asynchronous context (tokio loop that named rt in your code). This is a very bad idea.

You should be able to return srv_cons future (with all the conversions) with the same effect.

tailhook avatar Nov 15 '18 12:11 tailhook

The idea was to prevent further producers to connect, probably there is a cleaner solution now :)

lu-zero avatar Nov 15 '18 12:11 lu-zero

The idea was to prevent further producers to connect, probably there is a cleaner solution now :)

Sure. If you map "incoming" stream to a future, it waits for the future to complete before accepting new connections (in fact it runs '.listen(N)competitive futures which is1` in your case). So the effect of returning the future is exactly what you want.

tailhook avatar Nov 15 '18 12:11 tailhook

With the suggested change I got:

error[E0282]: type annotations needed
   --> src/main.rs:259:28
    |
259 |             let (tx, rx) = oneshot::channel();
    |                 --------   ^^^^^^^^^^^^^^^^ cannot infer type for `T`
    |                 |
    |                 consider giving the pattern a type

lu-zero avatar Nov 16 '18 14:11 lu-zero

Oops, sorry, the following should work:

let (tx, rx) = oneshot::channel::<()>();

This means use () type for the channel. You can also apply annotation in .map(...) but it could be a little bit more verbose.

tailhook avatar Nov 16 '18 15:11 tailhook

It seems working with few more .map(...).map_err(), sadly I couldn't polish a proper full example yet :)

thank you a lot!

lu-zero avatar Nov 19 '18 13:11 lu-zero