rustis
rustis copied to clipboard
[bug] Panic when using pub/sub with bb8 connection pooling
I'm using a bb8 connection pool in my application, which was created like this:
let manager = PooledClientManager::new(REDIS_URL.to_string())
.expect("failed to create redis client manager");
rustis::bb8::Pool::builder()
.max_size(256)
.build_unchecked(manager)
Then a tokio task in my application needs to listen to pub/sub for a while, so it retrieves a connection from the pool:
let (sink, stream) = pool.get().await?.create_pub_sub()
When the task completes, the connection is returned to the pool. At the same time, the bb8 connection pool is being used for other parts of my application for non pub/sub related connections.
After my application has been running for a few moments and connections have been recycled in/out of the pool, another part of my code retrieves a connection from the pool and uses it to send a command (e.g. xadd). This will cause a panic in rustis internals. I have pasted the panic at the bottom of this issue.
If I don't use a connection pool for the pub/sub connections this issue goes away (so that is what I'm doing for now).
The same error also occurs if I use the previous version of rustis and call subscribe instead of create_pub_sub, so I do not think this issue is caused by the latest changes on main.
The error and backtrace:
thread 'rocket-worker-thread' panicked at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17:
[127.0.0.1:6381] Received unexpected message: Ok(SimpleString("PONG"))
stack backtrace:
0: rust_begin_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
1: core::panicking::panic_fmt
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
2: rustis::network::network_handler::NetworkHandler::receive_result
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17
3: rustis::network::network_handler::NetworkHandler::handle_result::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:410:25
4: rustis::network::network_handler::NetworkHandler::network_loop::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:153:47
5: rustis::network::network_handler::NetworkHandler::connect::{{closure}}::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:138:60
6: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/future/future.rs:125:9
7: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:328:17
8: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/loom/std/unsafe_cell.rs:16:9
9: tokio::runtime::task::core::Core<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:317:13
10: tokio::runtime::task::harness::poll_future::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:485:19
11: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
12: std::panicking::try::do_call
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
13: ___rust_try
14: std::panicking::try
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
15: std::panic::catch_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
16: tokio::runtime::task::harness::poll_future
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:473:18
17: tokio::runtime::task::harness::Harness<T,S>::poll_inner
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:208:27
18: tokio::runtime::task::harness::Harness<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:153:15
19: tokio::runtime::task::raw::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:271:5
20: tokio::runtime::task::raw::RawTask::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:201:18
21: tokio::runtime::task::LocalNotified<S>::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/mod.rs:416:9
22: tokio::runtime::scheduler::multi_thread::worker::Context::run_task::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:576:13
23: tokio::runtime::coop::with_budget
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/coop.rs:107:5
24: tokio::runtime::coop::budget
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/coop.rs:73:5
25: tokio::runtime::scheduler::multi_thread::worker::Context::run_task
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:575:9
26: tokio::runtime::scheduler::multi_thread::worker::Context::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:526:24
27: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:491:21
28: tokio::runtime::context::scoped::Scoped<T>::set
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context/scoped.rs:40:9
29: tokio::runtime::context::set_scheduler::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context.rs:176:26
30: std::thread::local::LocalKey<T>::try_with
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:270:16
31: std::thread::local::LocalKey<T>::with
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:246:9
32: tokio::runtime::context::set_scheduler
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context.rs:176:9
33: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:486:9
34: tokio::runtime::context::runtime::enter_runtime
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context/runtime.rs:65:16
35: tokio::runtime::scheduler::multi_thread::worker::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:478:5
36: tokio::runtime::scheduler::multi_thread::worker::Launch::launch::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:447:45
37: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/task.rs:42:21
38: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:328:17
39: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/loom/std/unsafe_cell.rs:16:9
40: tokio::runtime::task::core::Core<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:317:13
41: tokio::runtime::task::harness::poll_future::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:485:19
42: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
43: std::panicking::try::do_call
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
44: ___rust_try
45: std::panicking::try
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
46: std::panic::catch_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
47: tokio::runtime::task::harness::poll_future
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:473:18
48: tokio::runtime::task::harness::Harness<T,S>::poll_inner
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:208:27
49: tokio::runtime::task::harness::Harness<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:153:15
50: tokio::runtime::task::raw::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:271:5
51: tokio::runtime::task::raw::RawTask::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:201:18
52: tokio::runtime::task::UnownedTask<S>::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/mod.rs:453:9
53: tokio::runtime::blocking::pool::Task::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:159:9
54: tokio::runtime::blocking::pool::Inner::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:513:17
55: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:471:13
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
@richardhenry,
It may be a race condition because the PubSubSink has been dropped and then try to unsubscribe in the background while the connection is reused...
Could you please call close(). await?
on the sink to see if it fixes the problem
Cheers,
Michaël
Still seeing the same errors even when explicitly calling sink.close().await?
on the PubSubSplitSink before dropping it:
thread 'rocket-worker-thread' panicked at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17:
[127.0.0.1:6381] Received unexpected message: Ok(SimpleString("PONG"))
stack backtrace:
0: rust_begin_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
1: core::panicking::panic_fmt
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
2: rustis::network::network_handler::NetworkHandler::receive_result
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17
3: rustis::network::network_handler::NetworkHandler::handle_result::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:410:25
4: rustis::network::network_handler::NetworkHandler::network_loop::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:153:47
5: rustis::network::network_handler::NetworkHandler::connect::{{closure}}::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:138:60
[...]
Hi @richardhenry,
I could not reproduce the problem with this test
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
#[serial]
async fn pub_sub() -> Result<()> {
log_try_init();
let manager = PooledClientManager::new(get_default_addr())?;
let pool = crate::bb8::Pool::builder()
.build(manager)
.await?;
loop {
let (mut sink, mut stream) = pool.get().await.unwrap().create_pub_sub().split();
sink.subscribe("foo").await?;
pool.get().await.unwrap().publish("foo", "bar").await?;
let message = stream.next().await.unwrap()?;
assert_eq!(b"foo".to_vec(), message.channel);
assert_eq!(b"bar".to_vec(), message.payload);
sink.close().await?;
let _value: String = pool.get().await.unwrap().get("key").await?;
}
}
Could you help by modifying the test to reproduce the problem? It may be too simple...
Hello,
I just fixed a bad status pb in the NetworkHandler implementation, let me know if it fixes your pb. https://github.com/dahomey-technologies/rustis/commit/b107cdbdfd1c553edf5a55ff81f63aea7dbcd662
I'm struggling to repro this in a test also. I have tried spawning multiple tokio tasks using the pool concurrently and that also isn't triggering the problem. I'll try your latest commit with my application and see if I'm still seeing these issues. Thanks
any news ?
Still seeing similar issues unfortunately. The error I'm getting from rustis is Client("send failed because receiver is gone")
The debug logs I'm seeing from rustis immediately before the issue are:
[2024-04-20T04:57:56Z DEBUG rustis::network::cluster_connection] [127.0.0.1:6381] Analyzing command Command { name: "PING", args: CommandArgs { args: [] }, kill_connection_on_write: 0, command_seq: 104 }
[2024-04-20T04:57:56Z DEBUG rustis::network::cluster_connection] [127.0.0.1:6381] keys: [], slots: []
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6381] Sending Command { name: "PING", args: CommandArgs { args: [] }, kill_connection_on_write: 0, command_seq: 104 }
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6382] Sending Command { name: "PING", args: CommandArgs { args: [] }, kill_connection_on_write: 0, command_seq: 104 }
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6380] Sending Command { name: "PING", args: CommandArgs { args: [] }, kill_connection_on_write: 0, command_seq: 104 }
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6381] Received result SimpleString("PONG")
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6382] Received result SimpleString("PONG")
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6380] Received result SimpleString("PONG")
[2024-04-20T04:57:56Z DEBUG rustis::network::cluster_connection] [127.0.0.1:6381] no_response_policy
[2024-04-20T04:57:56Z DEBUG rustis::network::cluster_connection] [127.0.0.1:6381] no_response_policy
The backtrace:
thread 'rocket-worker-thread' panicked at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/b168d8e/src/network/network_handler.rs:556:17:
[127.0.0.1:6381] Received unexpected message: Ok(SimpleString("PONG"))
stack backtrace:
0: rust_begin_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
1: core::panicking::panic_fmt
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
2: rustis::network::network_handler::NetworkHandler::receive_result
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/b168d8e/src/network/network_handler.rs:556:17
3: rustis::network::network_handler::NetworkHandler::handle_result::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/b168d8e/src/network/network_handler.rs:398:25
4: rustis::network::network_handler::NetworkHandler::network_loop::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/b168d8e/src/network/network_handler.rs:153:47
5: rustis::network::network_handler::NetworkHandler::connect::{{closure}}::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/b168d8e/src/network/network_handler.rs:138:60
6: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/future/future.rs:125:9
7: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/core.rs:328:17
8: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/loom/std/unsafe_cell.rs:16:9
9: tokio::runtime::task::core::Core<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/core.rs:317:13
10: tokio::runtime::task::harness::poll_future::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:485:19
11: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
12: std::panicking::try::do_call
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
13: ___rust_try
14: std::panicking::try
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
15: std::panic::catch_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
16: tokio::runtime::task::harness::poll_future
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:473:18
17: tokio::runtime::task::harness::Harness<T,S>::poll_inner
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:208:27
18: tokio::runtime::task::harness::Harness<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:153:15
19: tokio::runtime::task::raw::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/raw.rs:271:5
20: tokio::runtime::task::raw::RawTask::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/raw.rs:201:18
21: tokio::runtime::task::LocalNotified<S>::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/mod.rs:427:9
22: tokio::runtime::scheduler::multi_thread::worker::Context::run_task::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:576:13
23: tokio::runtime::coop::with_budget
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/coop.rs:107:5
24: tokio::runtime::coop::budget
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/coop.rs:73:5
25: tokio::runtime::scheduler::multi_thread::worker::Context::run_task
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:575:9
26: tokio::runtime::scheduler::multi_thread::worker::Context::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:526:24
27: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:491:21
28: tokio::runtime::context::scoped::Scoped<T>::set
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context/scoped.rs:40:9
29: tokio::runtime::context::set_scheduler::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context.rs:176:26
30: std::thread::local::LocalKey<T>::try_with
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:270:16
31: std::thread::local::LocalKey<T>::with
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:246:9
32: tokio::runtime::context::set_scheduler
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context.rs:176:9
33: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:486:9
34: tokio::runtime::context::runtime::enter_runtime
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context/runtime.rs:65:16
35: tokio::runtime::scheduler::multi_thread::worker::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:478:5
36: tokio::runtime::scheduler::multi_thread::worker::Launch::launch::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:447:45
37: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/blocking/task.rs:42:21
38: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/core.rs:328:17
39: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/loom/std/unsafe_cell.rs:16:9
40: tokio::runtime::task::core::Core<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/core.rs:317:13
41: tokio::runtime::task::harness::poll_future::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:485:19
42: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
43: std::panicking::try::do_call
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
44: ___rust_try
45: std::panicking::try
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
46: std::panic::catch_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
47: tokio::runtime::task::harness::poll_future
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:473:18
48: tokio::runtime::task::harness::Harness<T,S>::poll_inner
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:208:27
49: tokio::runtime::task::harness::Harness<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:153:15
50: tokio::runtime::task::raw::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/raw.rs:271:5
51: tokio::runtime::task::raw::RawTask::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/raw.rs:201:18
52: tokio::runtime::task::UnownedTask<S>::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/mod.rs:464:9
53: tokio::runtime::blocking::pool::Task::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/blocking/pool.rs:159:9
54: tokio::runtime::blocking::pool::Inner::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/blocking/pool.rs:513:17
55: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/blocking/pool.rs:471:13
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
Please try the very last version where I fixed a bug if you try to subscribe to the same channel twice on the same client instance.
Before the fix, the pub/sub state could be messed up leading to the kind of error you get.
After the fix, you will get an explicit error on the second subscription.
@richardhenry
As a test (not an actual fix), could you please increase the command timeout in the configuration? To 1s for instance.
It's like if the connection is still in pub sub mode and the sink.close().await?
does not actually wait for the unsubscribe to complete and then the connection is put back into the pool in an intermediate status.
new fix on release 0.13.3
Thanks - I'll give this a try later this week
Closing this issue for now. Please feel free to open it again if the problem persists