msg-rs icon indicating copy to clipboard operation
msg-rs copied to clipboard

Connection to pub sockets gets dropped before subscribe receive message

Open mzbelle opened this issue 10 months ago • 1 comments

Snippet:

let msg = sub_socket.next().await.unwrap();
println!("Received message: {:?}", msg);

subsocket shutdown triggered

impl<T: Transport<A>, A: Address> Drop for SubSocket<T, A> {
    fn drop(&mut self) {
        // Try to tell the driver to gracefully shut down.
        let _ = self.to_driver.try_send(Command::Shutdown);
    }
}

logs

Received command: Connect { endpoint: 127.0.0.1:9899 }
2025-02-21T20:27:25.353317Z  INFO arbitrage_monitor::worker: Stats: SocketStats { session_stats: RwLock { data: {} } }
2025-02-21T20:27:25.353480Z DEBUG msg_socket::sub::driver: Resetting publisher at 127.0.0.1:9899
2025-02-21T20:27:25.353476Z  INFO arbitrage_monitor::worker: Sent message: 0.334
2025-02-21T20:27:25.353506Z DEBUG msg_socket::sub::driver: Received command: Subscribe { topic: "dex-v2" }
2025-02-21T20:27:25.353520Z  INFO msg_socket::sub::driver: Subscribed to topic topic="dex-v2" n_publishers=1
2025-02-21T20:27:25.353782Z ERROR msg_socket::sub::driver: Error connecting to publisher err=Os { code: 61, kind: ConnectionRefused, message: "Connection refused" } addr=127.0.0.1:9899
2025-02-21T20:27:25.456304Z DEBUG msg_socket::sub::driver: Retrying connection to 127.0.0.1:9899 backoff=200ms
2025-02-21T20:27:25.457198Z ERROR msg_socket::sub::driver: Error connecting to publisher err=Os { code: 61, kind: ConnectionRefused, message: "Connection refused" } addr=127.0.0.1:9899
2025-02-21T20:27:25.659166Z DEBUG msg_socket::sub::driver: Retrying connection to 127.0.0.1:9899 backoff=400ms
2025-02-21T20:27:25.660046Z ERROR msg_socket::sub::driver: Error connecting to publisher err=Os { code: 61, kind: ConnectionRefused, message: "Connection refused" } addr=127.0.0.1:9899
2025-02-21T20:27:26.061855Z DEBUG msg_socket::sub::driver: Retrying connection to 127.0.0.1:9899 backoff=800ms

CC: @mempirate

mzbelle avatar Feb 21 '25 20:02 mzbelle

It seems that the subscriber just can't reach the endpoint, could you share a bit more code regarding the publisher? Are you sure it isn't dropped before the subscriber tries to connect?

The shutdown on SubSocket isn't getting called (from what I can tell in the logs). The publisher always gets reset when trying to connect to it initially so it's queued for retries.

mempirate avatar Feb 28 '25 13:02 mempirate