tk-listen
tk-listen copied to clipboard
Please provide a full example of select() usage for a clean shutdown
I'm trying to follow the example in the documentation, it but I'm currently completely lost on the error message reported.
What do you have in mind? Is it clean shutdown of the application, or is it just closing single bound socket? Can you show me the error message (is it compile-time or run-time error?)
My, supposedly tiny, application should do something like this:
- listen on an address for a single connection
- once it receives the connection listen for as many connections as possible on another address
- once the master connection closes, drop all the secondary connections open and stop listening for more
- go back to 1
let srv_prod = l_prod
.incoming()
.sleep_on_error(Duration::from_millis(100))
.map(move |socket| {
setup(socket, prod_state.clone(), true, buffer_size.clone());
let l_cons = TcpListener::bind(&(cfg.output_host, cfg.port + 1).into()).unwrap();
let cons_state = state.clone();
let (tx, rx) = oneshot::channel();
let srv_cons = l_cons
.incoming()
.sleep_on_error(Duration::from_millis(100))
.map(move |socket| {
setup(socket, cons_state.clone(), false, buffer_size.clone());
Ok(())
})
.listen(1000)
.select(rx.into());
tokio::run(srv_cons);
Ok(())
})
.listen(1);
rt.spawn(srv_prod);
rt.shutdown_on_idle().wait().unwrap();
error[E0271]: type mismatch resolving `<futures::Select<tk_listen::Listen<futures::stream::Map<tk_listen::SleepOnError<tokio::net::tcp::Incoming>, [closure@src/main.rs:264:22: 267:18 cons_state:_, buffer_size:_]>>, _> as futures::Future>::Error == ()`
--> src/main.rs:270:13
|
270 | tokio::run(srv_cons);
| ^^^^^^^^^^ expected tuple, found ()
|
= note: expected type `((), futures::SelectNext<tk_listen::Listen<futures::stream::Map<tk_listen::SleepOnError<tokio::net::tcp::Incoming>, [closure@src/main.rs:264:22: 267:18 cons_state:_, buffer_size:_]>>, _>)`
found type `()`
= note: required by `tokio::run`
Just convert the result on of the future:
tokio::run(srv_cons.map(|_| ()).map_err(|_| ()));
The idea is that .select()
returns a tuple: the result of the one future and a wrapper type around the second future. In your case, it's fine to shut down (return value from a future) in any case, i.e. if either listening socket shuts down (which in turn never happens) or when shutdown channel received a message or closed.
It may be counter-intuitive that we also discard errors, but the listener doesn't propagate errors as far as I remember and channel errors are here only to close it when the other side is closed.
Oh, by the way, do you use tokio::run
which is blocking in asynchronous context (tokio loop that named rt
in your code). This is a very bad idea.
You should be able to return srv_cons
future (with all the conversions) with the same effect.
The idea was to prevent further producers to connect, probably there is a cleaner solution now :)
The idea was to prevent further producers to connect, probably there is a cleaner solution now :)
Sure. If you map "incoming" stream to a future, it waits for the future to complete before accepting new connections (in fact it runs '.listen(N)competitive futures which is
1` in your case). So the effect of returning the future is exactly what you want.
With the suggested change I got:
error[E0282]: type annotations needed
--> src/main.rs:259:28
|
259 | let (tx, rx) = oneshot::channel();
| -------- ^^^^^^^^^^^^^^^^ cannot infer type for `T`
| |
| consider giving the pattern a type
Oops, sorry, the following should work:
let (tx, rx) = oneshot::channel::<()>();
This means use ()
type for the channel. You can also apply annotation in .map(...)
but it could be a little bit more verbose.
It seems working with few more .map(...).map_err()
, sadly I couldn't polish a proper full example yet :)
thank you a lot!