msg-rs
msg-rs copied to clipboard
Connection to pub sockets gets dropped before subscribe receive message
Snippet:
let msg = sub_socket.next().await.unwrap();
println!("Received message: {:?}", msg);
subsocket shutdown triggered
impl<T: Transport<A>, A: Address> Drop for SubSocket<T, A> {
fn drop(&mut self) {
// Try to tell the driver to gracefully shut down.
let _ = self.to_driver.try_send(Command::Shutdown);
}
}
logs
Received command: Connect { endpoint: 127.0.0.1:9899 }
2025-02-21T20:27:25.353317Z INFO arbitrage_monitor::worker: Stats: SocketStats { session_stats: RwLock { data: {} } }
2025-02-21T20:27:25.353480Z DEBUG msg_socket::sub::driver: Resetting publisher at 127.0.0.1:9899
2025-02-21T20:27:25.353476Z INFO arbitrage_monitor::worker: Sent message: 0.334
2025-02-21T20:27:25.353506Z DEBUG msg_socket::sub::driver: Received command: Subscribe { topic: "dex-v2" }
2025-02-21T20:27:25.353520Z INFO msg_socket::sub::driver: Subscribed to topic topic="dex-v2" n_publishers=1
2025-02-21T20:27:25.353782Z ERROR msg_socket::sub::driver: Error connecting to publisher err=Os { code: 61, kind: ConnectionRefused, message: "Connection refused" } addr=127.0.0.1:9899
2025-02-21T20:27:25.456304Z DEBUG msg_socket::sub::driver: Retrying connection to 127.0.0.1:9899 backoff=200ms
2025-02-21T20:27:25.457198Z ERROR msg_socket::sub::driver: Error connecting to publisher err=Os { code: 61, kind: ConnectionRefused, message: "Connection refused" } addr=127.0.0.1:9899
2025-02-21T20:27:25.659166Z DEBUG msg_socket::sub::driver: Retrying connection to 127.0.0.1:9899 backoff=400ms
2025-02-21T20:27:25.660046Z ERROR msg_socket::sub::driver: Error connecting to publisher err=Os { code: 61, kind: ConnectionRefused, message: "Connection refused" } addr=127.0.0.1:9899
2025-02-21T20:27:26.061855Z DEBUG msg_socket::sub::driver: Retrying connection to 127.0.0.1:9899 backoff=800ms
CC: @mempirate
It seems that the subscriber just can't reach the endpoint, could you share a bit more code regarding the publisher? Are you sure it isn't dropped before the subscriber tries to connect?
The shutdown on SubSocket isn't getting called (from what I can tell in the logs). The publisher always gets reset when trying to connect to it initially so it's queued for retries.