redis-async-rs icon indicating copy to clipboard operation
redis-async-rs copied to clipboard

Connection cannot be established when using `.wait` (but works when using `tokio::run`)

Open rivertam opened this issue 6 years ago • 1 comments

I don't know whether this is expected behavior. It definitely could be the case that this library actively relies on tokio fixtures or something. However, the failure (in my opinion) could be reported more cleanly.

As it is, if I change examples/pubsub.rs to use .wait().unwrap() instead of tokio::run(...), I get

[2019-07-28T14:03:33Z ERROR redis_async::reconnect] Connection cannot be established: Cannot spawn a pubsub connection: SpawnError { is_shutdown: true }
ERROR, cannot receive messages. Error message: Connection(ConnectionFailed)

This led me down a rather annoying rabbit-hole, unfortunately, as the issue was with the event-loop rather than with networking which is implied by ConnectionFailed.

rivertam avatar Jul 28 '19 14:07 rivertam

This is the same for paired_connect too, so it's not specific to PubSub.

The code to test this is:

extern crate futures;
#[macro_use]
extern crate redis_async;
extern crate tokio;

use std::env;
use std::sync::Arc;

use futures::sync::oneshot;
use futures::{future, Future};

use redis_async::client;

// An artificial "realistic" non-trivial example to demonstrate usage
fn main() {
    // Create some completely arbitrary "test data"
    let test_data_size = 10;
    let test_data: Vec<_> = (0..test_data_size).map(|x| (x, x.to_string())).collect();

    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:6379".to_string())
        .parse()
        .unwrap();

    let connection = client::paired_connect(&addr).wait().unwrap();
    let (tx, rx) = oneshot::channel();

    let send_data = {
        let connection = Arc::new(connection);
        let futures = test_data.into_iter().map(move |data| {
            let connection_inner = connection.clone();
            connection
                .send(resp_array!["INCR", "realistic_test_ctr"])
                .and_then(move |ctr: String| {
                    let key = format!("rt_{}", ctr);
                    let d_val = data.0.to_string();
                    connection_inner.send_and_forget(resp_array!["SET", &key, d_val]);
                    connection_inner.send(resp_array!["SET", data.1, key])
                })
        });
        future::join_all(futures)
    };

    let deliver = send_data.then(|result| match result {
        Ok(result) => match tx.send(result) {
            Ok(_) => future::ok(()),
            Err(e) => {
                println!("Unexpected error: {:?}", e);
                future::err(())
            }
        },
        Err(e) => {
            println!("Unexpected error: {:?}", e);
            future::err(())
        }
    });

    tokio::run(deliver);

    let result: Vec<String> = rx.wait().expect("Waiting for delivery");
    println!("RESULT: {:?}", result);
    assert_eq!(result.len(), test_data_size);
}

which is lightly adapted from examples/realistic.rs and I believe should do the same thing.

Again, not really a "bug" because if you need tokio, you need tokio, but the error message itself is problematic in my opinion.

rivertam avatar Jul 28 '19 14:07 rivertam