rustis icon indicating copy to clipboard operation
rustis copied to clipboard

[bug] Panic when using pub/sub with bb8 connection pooling

Open richardhenry opened this issue 11 months ago • 9 comments

I'm using a bb8 connection pool in my application, which was created like this:

let manager = PooledClientManager::new(REDIS_URL.to_string())
    .expect("failed to create redis client manager");
rustis::bb8::Pool::builder()
    .max_size(256)
    .build_unchecked(manager)

Then a tokio task in my application needs to listen to pub/sub for a while, so it retrieves a connection from the pool:

let (sink, stream) = pool.get().await?.create_pub_sub()

When the task completes, the connection is returned to the pool. At the same time, the bb8 connection pool is being used for other parts of my application for non pub/sub related connections.

After my application has been running for a few moments and connections have been recycled in/out of the pool, another part of my code retrieves a connection from the pool and uses it to send a command (e.g. xadd). This will cause a panic in rustis internals. I have pasted the panic at the bottom of this issue.

If I don't use a connection pool for the pub/sub connections this issue goes away (so that is what I'm doing for now).

The same error also occurs if I use the previous version of rustis and call subscribe instead of create_pub_sub, so I do not think this issue is caused by the latest changes on main.

The error and backtrace:

thread 'rocket-worker-thread' panicked at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17:
[127.0.0.1:6381] Received unexpected message: Ok(SimpleString("PONG"))
stack backtrace:
   0: rust_begin_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
   1: core::panicking::panic_fmt
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
   2: rustis::network::network_handler::NetworkHandler::receive_result
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17
   3: rustis::network::network_handler::NetworkHandler::handle_result::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:410:25
   4: rustis::network::network_handler::NetworkHandler::network_loop::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:153:47
   5: rustis::network::network_handler::NetworkHandler::connect::{{closure}}::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:138:60
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/future/future.rs:125:9
   7: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:328:17
   8: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/loom/std/unsafe_cell.rs:16:9
   9: tokio::runtime::task::core::Core<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:317:13
  10: tokio::runtime::task::harness::poll_future::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:485:19
  11: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
  12: std::panicking::try::do_call
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
  13: ___rust_try
  14: std::panicking::try
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
  15: std::panic::catch_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
  16: tokio::runtime::task::harness::poll_future
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:473:18
  17: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:208:27
  18: tokio::runtime::task::harness::Harness<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:153:15
  19: tokio::runtime::task::raw::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:271:5
  20: tokio::runtime::task::raw::RawTask::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:201:18
  21: tokio::runtime::task::LocalNotified<S>::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/mod.rs:416:9
  22: tokio::runtime::scheduler::multi_thread::worker::Context::run_task::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:576:13
  23: tokio::runtime::coop::with_budget
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/coop.rs:107:5
  24: tokio::runtime::coop::budget
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/coop.rs:73:5
  25: tokio::runtime::scheduler::multi_thread::worker::Context::run_task
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:575:9
  26: tokio::runtime::scheduler::multi_thread::worker::Context::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:526:24
  27: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:491:21
  28: tokio::runtime::context::scoped::Scoped<T>::set
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context/scoped.rs:40:9
  29: tokio::runtime::context::set_scheduler::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context.rs:176:26
  30: std::thread::local::LocalKey<T>::try_with
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:270:16
  31: std::thread::local::LocalKey<T>::with
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:246:9
  32: tokio::runtime::context::set_scheduler
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context.rs:176:9
  33: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:486:9
  34: tokio::runtime::context::runtime::enter_runtime
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context/runtime.rs:65:16
  35: tokio::runtime::scheduler::multi_thread::worker::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:478:5
  36: tokio::runtime::scheduler::multi_thread::worker::Launch::launch::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:447:45
  37: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/task.rs:42:21
  38: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:328:17
  39: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/loom/std/unsafe_cell.rs:16:9
  40: tokio::runtime::task::core::Core<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:317:13
  41: tokio::runtime::task::harness::poll_future::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:485:19
  42: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
  43: std::panicking::try::do_call
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
  44: ___rust_try
  45: std::panicking::try
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
  46: std::panic::catch_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
  47: tokio::runtime::task::harness::poll_future
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:473:18
  48: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:208:27
  49: tokio::runtime::task::harness::Harness<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:153:15
  50: tokio::runtime::task::raw::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:271:5
  51: tokio::runtime::task::raw::RawTask::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:201:18
  52: tokio::runtime::task::UnownedTask<S>::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/mod.rs:453:9
  53: tokio::runtime::blocking::pool::Task::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:159:9
  54: tokio::runtime::blocking::pool::Inner::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:513:17
  55: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:471:13
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

richardhenry avatar Mar 17 '24 22:03 richardhenry

@richardhenry,

It may be a race condition because the PubSubSink has been dropped and then try to unsubscribe in the background while the connection is reused...

Could you please call close(). await? on the sink to see if it fixes the problem

Cheers,

Michaël

mcatanzariti avatar Mar 18 '24 08:03 mcatanzariti

Still seeing the same errors even when explicitly calling sink.close().await? on the PubSubSplitSink before dropping it:

thread 'rocket-worker-thread' panicked at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17:
[127.0.0.1:6381] Received unexpected message: Ok(SimpleString("PONG"))
stack backtrace:
   0: rust_begin_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
   1: core::panicking::panic_fmt
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
   2: rustis::network::network_handler::NetworkHandler::receive_result
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17
   3: rustis::network::network_handler::NetworkHandler::handle_result::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:410:25
   4: rustis::network::network_handler::NetworkHandler::network_loop::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:153:47
   5: rustis::network::network_handler::NetworkHandler::connect::{{closure}}::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:138:60
[...]

richardhenry avatar Mar 18 '24 18:03 richardhenry

Hi @richardhenry,

I could not reproduce the problem with this test

#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
#[serial]
async fn pub_sub() -> Result<()> {
    log_try_init();
    let manager = PooledClientManager::new(get_default_addr())?;
    let pool = crate::bb8::Pool::builder()
        .build(manager)
        .await?;

    loop {
        let (mut sink, mut stream) = pool.get().await.unwrap().create_pub_sub().split();
        sink.subscribe("foo").await?;
        pool.get().await.unwrap().publish("foo", "bar").await?;
        let message = stream.next().await.unwrap()?;
        assert_eq!(b"foo".to_vec(), message.channel);
        assert_eq!(b"bar".to_vec(), message.payload);

        sink.close().await?;

        let _value: String = pool.get().await.unwrap().get("key").await?;
    }
}

Could you help by modifying the test to reproduce the problem? It may be too simple...

mcatanzariti avatar Apr 06 '24 17:04 mcatanzariti

Hello,

I just fixed a bad status pb in the NetworkHandler implementation, let me know if it fixes your pb. https://github.com/dahomey-technologies/rustis/commit/b107cdbdfd1c553edf5a55ff81f63aea7dbcd662

mcatanzariti avatar Apr 06 '24 18:04 mcatanzariti

I'm struggling to repro this in a test also. I have tried spawning multiple tokio tasks using the pool concurrently and that also isn't triggering the problem. I'll try your latest commit with my application and see if I'm still seeing these issues. Thanks

richardhenry avatar Apr 07 '24 21:04 richardhenry

any news ?

mcatanzariti avatar Apr 18 '24 13:04 mcatanzariti

Still seeing similar issues unfortunately. The error I'm getting from rustis is Client("send failed because receiver is gone")

The debug logs I'm seeing from rustis immediately before the issue are:

[2024-04-20T04:57:56Z DEBUG rustis::network::cluster_connection] [127.0.0.1:6381] Analyzing command Command { name: "PING", args: CommandArgs { args: [] }, kill_connection_on_write: 0, command_seq: 104 }
[2024-04-20T04:57:56Z DEBUG rustis::network::cluster_connection] [127.0.0.1:6381] keys: [], slots: []
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6381] Sending Command { name: "PING", args: CommandArgs { args: [] }, kill_connection_on_write: 0, command_seq: 104 }
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6382] Sending Command { name: "PING", args: CommandArgs { args: [] }, kill_connection_on_write: 0, command_seq: 104 }
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6380] Sending Command { name: "PING", args: CommandArgs { args: [] }, kill_connection_on_write: 0, command_seq: 104 }
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6381] Received result SimpleString("PONG")
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6382] Received result SimpleString("PONG")
[2024-04-20T04:57:56Z DEBUG rustis::network::standalone_connection] [127.0.0.1:6380] Received result SimpleString("PONG")
[2024-04-20T04:57:56Z DEBUG rustis::network::cluster_connection] [127.0.0.1:6381] no_response_policy
[2024-04-20T04:57:56Z DEBUG rustis::network::cluster_connection] [127.0.0.1:6381] no_response_policy

The backtrace:

thread 'rocket-worker-thread' panicked at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/b168d8e/src/network/network_handler.rs:556:17:
[127.0.0.1:6381] Received unexpected message: Ok(SimpleString("PONG"))
stack backtrace:
   0: rust_begin_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
   1: core::panicking::panic_fmt
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
   2: rustis::network::network_handler::NetworkHandler::receive_result
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/b168d8e/src/network/network_handler.rs:556:17
   3: rustis::network::network_handler::NetworkHandler::handle_result::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/b168d8e/src/network/network_handler.rs:398:25
   4: rustis::network::network_handler::NetworkHandler::network_loop::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/b168d8e/src/network/network_handler.rs:153:47
   5: rustis::network::network_handler::NetworkHandler::connect::{{closure}}::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/b168d8e/src/network/network_handler.rs:138:60
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/future/future.rs:125:9
   7: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/core.rs:328:17
   8: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/loom/std/unsafe_cell.rs:16:9
   9: tokio::runtime::task::core::Core<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/core.rs:317:13
  10: tokio::runtime::task::harness::poll_future::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:485:19
  11: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
  12: std::panicking::try::do_call
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
  13: ___rust_try
  14: std::panicking::try
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
  15: std::panic::catch_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
  16: tokio::runtime::task::harness::poll_future
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:473:18
  17: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:208:27
  18: tokio::runtime::task::harness::Harness<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:153:15
  19: tokio::runtime::task::raw::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/raw.rs:271:5
  20: tokio::runtime::task::raw::RawTask::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/raw.rs:201:18
  21: tokio::runtime::task::LocalNotified<S>::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/mod.rs:427:9
  22: tokio::runtime::scheduler::multi_thread::worker::Context::run_task::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:576:13
  23: tokio::runtime::coop::with_budget
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/coop.rs:107:5
  24: tokio::runtime::coop::budget
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/coop.rs:73:5
  25: tokio::runtime::scheduler::multi_thread::worker::Context::run_task
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:575:9
  26: tokio::runtime::scheduler::multi_thread::worker::Context::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:526:24
  27: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:491:21
  28: tokio::runtime::context::scoped::Scoped<T>::set
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context/scoped.rs:40:9
  29: tokio::runtime::context::set_scheduler::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context.rs:176:26
  30: std::thread::local::LocalKey<T>::try_with
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:270:16
  31: std::thread::local::LocalKey<T>::with
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:246:9
  32: tokio::runtime::context::set_scheduler
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context.rs:176:9
  33: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:486:9
  34: tokio::runtime::context::runtime::enter_runtime
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context/runtime.rs:65:16
  35: tokio::runtime::scheduler::multi_thread::worker::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:478:5
  36: tokio::runtime::scheduler::multi_thread::worker::Launch::launch::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/multi_thread/worker.rs:447:45
  37: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/blocking/task.rs:42:21
  38: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/core.rs:328:17
  39: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/loom/std/unsafe_cell.rs:16:9
  40: tokio::runtime::task::core::Core<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/core.rs:317:13
  41: tokio::runtime::task::harness::poll_future::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:485:19
  42: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
  43: std::panicking::try::do_call
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
  44: ___rust_try
  45: std::panicking::try
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
  46: std::panic::catch_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
  47: tokio::runtime::task::harness::poll_future
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:473:18
  48: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:208:27
  49: tokio::runtime::task::harness::Harness<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/harness.rs:153:15
  50: tokio::runtime::task::raw::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/raw.rs:271:5
  51: tokio::runtime::task::raw::RawTask::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/raw.rs:201:18
  52: tokio::runtime::task::UnownedTask<S>::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/task/mod.rs:464:9
  53: tokio::runtime::blocking::pool::Task::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/blocking/pool.rs:159:9
  54: tokio::runtime::blocking::pool::Inner::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/blocking/pool.rs:513:17
  55: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/blocking/pool.rs:471:13
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

richardhenry avatar Apr 20 '24 05:04 richardhenry

Please try the very last version where I fixed a bug if you try to subscribe to the same channel twice on the same client instance.

Before the fix, the pub/sub state could be messed up leading to the kind of error you get.

After the fix, you will get an explicit error on the second subscription.

mcatanzariti avatar Apr 20 '24 06:04 mcatanzariti

@richardhenry

As a test (not an actual fix), could you please increase the command timeout in the configuration? To 1s for instance.

It's like if the connection is still in pub sub mode and the sink.close().await? does not actually wait for the unsubscribe to complete and then the connection is put back into the pool in an intermediate status.

mcatanzariti avatar Apr 24 '24 07:04 mcatanzariti

new fix on release 0.13.3

mcatanzariti avatar May 25 '24 22:05 mcatanzariti

Thanks - I'll give this a try later this week

richardhenry avatar May 28 '24 18:05 richardhenry

Closing this issue for now. Please feel free to open it again if the problem persists

mcatanzariti avatar Aug 07 '24 21:08 mcatanzariti