ntex icon indicating copy to clipboard operation
ntex copied to clipboard

Notify Specific WebSocket Clients from REST API Using Ntex Framework

Open anchalshivank opened this issue 1 year ago • 0 comments

I am building a Rust-based service using the ntex framework, and my goal is to notify specific WebSocket clients in real-time when a REST API endpoint is triggered. Below are the key features I want to achieve:


Features:

  1. WebSocket Clients: Clients connect to the /ws/{machine_id} endpoint, where machine_id uniquely identifies each client.
  2. REST API Trigger: A REST API endpoint (/notify-machine) will be called with a machine_id and a message. This should notify the WebSocket client associated with that machine_id in real time.
  3. Shared State: I need to maintain an in-memory registry to map machine_id to WebSocket connections for efficient message delivery.

Current Setup:

  • WebSocket Connection: Implemented a WebSocket service with heartbeat monitoring using web::ws::WsSink and state management.
  • REST API: The goal is to trigger message delivery to WebSocket clients from the REST API handler.

Below is a simplified version of my current implementation: socket-service

WebSocket Service:

async fn ws_service(
    (sink, mut server, machine_id): (web::ws::WsSink, web::types::State<mpsc::UnboundedSender<ServerMessage>>, String)
) -> Result<impl Service<web::ws::Frame, Response = Option<web::ws::Message>, Error = io::Error>, web::Error> {
    let state = Rc::new(RefCell::new(WsState { hb: Instant::now() }));

    let (tx, rx) = oneshot::channel();
    rt::spawn(heartbeat(state.clone(), sink, rx));

    let service = fn_service(move |frame| {
        let item = match frame {
            web::ws::Frame::Ping(msg) => Some(web::ws::Message::Pong(msg)),
            web::ws::Frame::Pong(_) => None,
            web::ws::Frame::Text(text) => Some(web::ws::Message::Text(
                String::from_utf8(Vec::from(text.as_ref())).unwrap().into(),
            )),
            _ => None,
        };
        ready(Ok(item))
    });

    Ok(chain(service).and_then(fn_shutdown(move || {
        let _ = tx.send(());
    })))
}

REST API and State Management:

async fn ws_index(
    req: web::HttpRequest,
    state: web::types::State<mpsc::UnboundedSender<ServerMessage>>,
    path: web::types::Path<String>
) -> Result<web::HttpResponse, web::Error> {
    let machine_id = path.clone();
    let config = map_config(fn_factory_with_config(ws_service), move |cfg| {
        (cfg, state.clone(), machine_id.clone())
    });
    web::ws::start(req, config.clone()).await
}

async fn start() -> UnboundedSender<ServerMessage> {
    let (tx, mut rx) = mpsc::unbounded();
    rt::Arbiter::new().exec_fn(move || {
        rt::spawn(async move {
            while let Some(msg) = rx.next().await {
                info!("Received message: {:?}", msg);
            }
            rt::Arbiter::current().stop();
        });
    });
    tx
}

#[ntex::main]
async fn main() -> std::io::Result<()> {
    env_logger::init();

    let state = start().await;
    web::server(move || {
        web::App::new()
            .state(state.clone())
            .wrap(web::middleware::Logger::default())
            .service(web::resource("/ws/{machine_id}").route(web::get().to(ws_index)))
            .service(notify_machine)
            .service(health)
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

Key Issues:

  1. Mapping Machine ID to Connections: How do I efficiently map machine_id to WebSocket connections (WsSink) for targeted communication?
  2. Sending Messages: How do I send a message to a specific WebSocket client (machine_id) when the /notify-machine REST API is triggered?
  3. State Management: What is the best approach to maintain the state of connected clients and their associations with machine_id?

Desired Behavior:

  1. When a client connects to /ws/{machine_id}, the connection should be registered with its machine_id.
  2. When the /notify-machine endpoint receives a POST request with a machine_id and a message, the corresponding WebSocket client should receive that message in real time.
  3. The WebSocket clients are behind NAT, so I cannot use HTTP to reach them. WebSockets are the only viable solution for real-time communication.

What I’ve Tried:

I referred to [ntex examples](https://github.com/ntex-rs/examples), but did not help. I am unsure how to manually send messages to specific clients from the REST API. For instance, in this [example snippet](https://github.com/anchalshivank/mt-notify-service/blob/8ca284333eb44eeadd8568631b4e86d61e7b03a8/src/main.rs#L59), I see how to send responses, but I need help sending messages proactively to connected clients.

Any guidance, examples, or advice for achieving this functionality would be greatly appreciated!

anchalshivank avatar Dec 09 '24 18:12 anchalshivank