ws-rs icon indicating copy to clipboard operation
ws-rs copied to clipboard

Timeout didn't be cleaned.

Open zoumi opened this issue 6 years ago • 3 comments

I have clone the master branch. And I found that ws-rs did not clean timeout. pieces of my code:

use ws;
use ::*;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use ws::util::Token;
use ws::util::Timeout;
use std::str::from_utf8;
use std::time::{Duration,Instant,SystemTime};

lazy_static!{
    static ref LAST_TIME_CLOSE: Mutex<SystemTime>=Mutex::new(SystemTime::now());
}

pub struct WSHandler{
    id:usize,
    //ping_timeout: Option<Timeout>,
    expire_timeout: Option<Timeout>,
    pub out: ws::Sender
}

const EXPIRE: Token = Token(2);
const EXPIRE_TIMEOUT:u64 = 30_000;

impl ws::Handler for WSHandler{
    fn on_shutdown(&mut self) { 
        debug!("shutdown at con:{:?}",self.id); 
    }

    fn on_open(&mut self, shake: ws::Handshake) -> ws::Result<()> {  
        debug!("open new con:{:?}",self.id); 
        self.out.timeout(EXPIRE_TIMEOUT, EXPIRE)
    }

    fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
        match msg{
            ws::Message::Binary(b) =>{
                debug!("ws receive: {:?} con:{:?}",b,self.id);
                error!("does not support Binary data:{:?}",b);
            }
            ws::Message::Text(t) =>{
                if(t=="init"){
                    //close all other connections
                    debug!("clean connections...");
                    let mut p = ::WS_CON_POOL.write().unwrap();
                    p.retain(|&k,&mut ref con|{
                        if(k==self.id){
                            true
                        }else{
                            con.close(ws::CloseCode::Away);
                            false
                        }
                    });
                    return Ok(());
                }
                if(t=="heartbeat"){   //reset timeout for this con
                    return Ok(());   
                }
                debug!("ws receive: {:?} con:{:?}",t,self.id);
            }
        }
        Ok(())
    }
    fn on_close(&mut self, code: ws::CloseCode, reason: &str) {
        if let Some(t) = self.expire_timeout.take() {
            self.out.cancel(t).unwrap();
        }
        let mut p = ::WS_CON_POOL.write().unwrap();
        (*p).remove(&self.id);
    }

    fn on_error(&mut self, err: ws::Error) { 
        error!("ws con with id:{:?} get error:{:?}",self.id,err);
    }
    fn on_timeout(&mut self, event: ws::util::Token) -> ws::Result<()> {
        match event {
            EXPIRE => {
                warn!("expire timeout accur,will close con:{:?}",self.id);
                //TODO::it bug of ws ,it did not clean timeout
                static LAST_DEL_ID: AtomicUsize = ATOMIC_USIZE_INIT;
                let mut g = LAST_TIME_CLOSE.lock().unwrap();
                if(g.elapsed().unwrap()>Duration::from_secs(1) && 
                   LAST_DEL_ID.load(Ordering::SeqCst)!=self.id){
                    self.out.close(ws::CloseCode::Away);
                    *g=SystemTime::now();
                    LAST_DEL_ID.store(self.id,Ordering::SeqCst);
                }
                Ok(())
            }
            // No other timeouts are possible
            _ => Err(ws::Error::new(ws::ErrorKind::Internal, "Invalid timeout token encountered!")),
        }
    }
    fn on_new_timeout(&mut self, event: Token, timeout: Timeout) -> ws::Result<()> {
         // Cancel the old timeout and replace.
        if event == EXPIRE {
            debug!("con:{:?} timeout reset",self.id);
            if let Some(t) = self.expire_timeout.take() {
                try!(self.out.cancel(t))
            }
            self.expire_timeout = Some(timeout)
        } 
        Ok(())   
    }
    fn on_frame(&mut self, frame: ws::Frame) -> ws::Result<Option<ws::Frame>> {
        // Some activity has occured, so reset the expiration
        try!(self.out.timeout(EXPIRE_TIMEOUT, EXPIRE));
        Ok(Some(frame))
    }
}

fn handle_msg(m: Msg,s: &ws::Sender){
}

pub fn ws_server(){
    loop{
        if let Err(error) = ws::listen("127.0.0.1:3012", |out| {
            let Token(id)=out.token();
            {
                //collect to pool we may use later
                let mut p = ::WS_CON_POOL.write().unwrap();
                debug!("create con:{:?}",out.token());
                (*p).insert(id,out.clone());
            }
            WSHandler{
                id:id,
                out: out,
                expire_timeout: None,
            }
        }) {
            // Inform the user of failure
            error!("Failed to create WebSocket due to {:?}", error);
        }
    }
}

the output:

[14:20:34] DEBUG con:3 timeout reset
[14:20:39] DEBUG con:3 timeout reset
[14:20:39] DEBUG con:1 timeout reset
[14:20:39] DEBUG con:0 timeout reset
[14:20:44] WARN expire timeout accur,will close con:2
[14:20:44] DEBUG con:1 timeout reset
[14:20:44] DEBUG con:0 timeout reset
[14:20:45] DEBUG create con:Token(2)
[14:20:45] DEBUG open new con:2
[14:20:45] DEBUG con:2 timeout reset
[14:20:49] DEBUG con:2 timeout reset
[14:20:49] DEBUG con:0 timeout reset
[14:20:49] DEBUG con:1 timeout reset
[14:20:54] DEBUG con:1 timeout reset
[14:20:54] DEBUG con:0 timeout reset
[14:20:54] DEBUG con:2 timeout reset
[14:20:59] DEBUG con:0 timeout reset
[14:20:59] DEBUG con:1 timeout reset
[14:20:59] DEBUG con:2 timeout reset
[14:21:04] DEBUG con:1 timeout reset
[14:21:04] DEBUG con:0 timeout reset
[14:21:04] DEBUG con:2 timeout reset
[14:21:09] WARN expire timeout accur,will close con:3
[14:21:09] DEBUG con:1 timeout reset
[14:21:09] DEBUG con:0 timeout reset
[14:21:10] DEBUG create con:Token(3)
[14:21:10] DEBUG open new con:3
[14:21:10] DEBUG con:3 timeout reset
[14:21:14] WARN expire timeout accur,will close con:2
[14:21:14] DEBUG con:1 timeout reset
[14:21:14] DEBUG con:0 timeout reset
[14:21:15] DEBUG create con:Token(2)
[14:21:15] DEBUG open new con:2
[14:21:15] DEBUG con:2 timeout reset
[14:21:19] DEBUG con:0 timeout reset
[14:21:19] DEBUG con:1 timeout reset

zoumi avatar Nov 12 '18 00:11 zoumi

more debug msg:

zoumi@zoumi-PC MINGW64 /h/work/rust_projects/com_tool (master)
$ /e/rustdir/target/debug/ComTool.exe
"D:\\zh-WORK\\rust\\com_tool"
[15:46:09] INFO The application is starting...
[15:46:09] INFO Application's working dir is :"H:\\work\\rust_projects\\com_tool"
[15:46:09] DEBUG "http://127.0.0.1:24050/com_tool.html"
[15:46:09] DEBUG Server listening on 0.0.0.0:24050
[15:46:09] DEBUG Running accept thread
[15:46:09] DEBUG create con:Token(0)
[15:46:09] DEBUG open new con:0
[15:46:09] DEBUG con:0 timeout reset
[15:46:09] DEBUG clean connections...
[15:46:09] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:46:09] DEBUG ws receive: "{\"msg_type\":\"TGetPorts\",\"msg_data\":null}" con:0
[15:46:09] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:46:09] DEBUG ws receive: "{\"msg_type\":\"TGetConfig\",\"msg_data\":{\"Config\":[\"\",null]}}" con:0
[15:46:09] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
get com port:SerialPortInfo { port_name: "COM1", port_type: Unknown }
get com port:SerialPortInfo { port_name: "COM2", port_type: Unknown }
[15:46:09] DEBUG WS_CON_POOL ids:[0]
[15:46:10] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:46:10] DEBUG create con:Token(0)
[15:46:10] DEBUG open new con:0
[15:46:10] DEBUG con:0 timeout reset
[15:46:10] DEBUG create con:Token(1)
[15:46:10] DEBUG open new con:1
[15:46:10] DEBUG con:1 timeout reset
[15:46:10] DEBUG create con:Token(2)
[15:46:10] DEBUG open new con:2
[15:46:10] DEBUG con:2 timeout reset
[15:46:11] DEBUG create con:Token(3)
[15:46:11] DEBUG open new con:3
[15:46:11] DEBUG con:3 timeout reset
[15:46:15] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:46:15] DEBUG con:1 timeout reset
[15:46:15] DEBUG con:3 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
your cancel:Some(Timeout { connection: Token(3), event: Token(2) })
[15:46:20] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:46:20] DEBUG con:0 timeout reset
[15:46:20] DEBUG con:3 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
your cancel:Some(Timeout { connection: Token(3), event: Token(2) })
[15:46:25] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:46:25] DEBUG con:1 timeout reset
[15:46:25] DEBUG con:3 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
your cancel:Some(Timeout { connection: Token(3), event: Token(2) })
[15:46:30] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:46:30] DEBUG con:1 timeout reset
[15:46:30] DEBUG con:3 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
your cancel:Some(Timeout { connection: Token(3), event: Token(2) })
[15:46:35] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:46:35] DEBUG con:1 timeout reset
[15:46:35] DEBUG con:3 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
your cancel:Some(Timeout { connection: Token(3), event: Token(2) })
[15:46:40] WARN expire timeout accur,will close con:2
[15:46:40] WARN expire timeout accur,will close con:0
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
your cancel:None
[15:46:40] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:46:41] DEBUG create con:Token(2)
[15:46:41] DEBUG open new con:2
[15:46:41] DEBUG con:2 timeout reset
[15:46:42] DEBUG create con:Token(0)
[15:46:42] DEBUG open new con:0
[15:46:42] DEBUG con:0 timeout reset
[15:46:45] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:46:45] DEBUG con:2 timeout reset
[15:46:45] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:46:50] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:46:50] DEBUG con:2 timeout reset
[15:46:50] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:46:55] DEBUG con:2 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
[15:46:55] DEBUG con:1 timeout reset
[15:46:55] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:47:00] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:47:00] DEBUG con:2 timeout reset
[15:47:00] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:47:05] WARN expire timeout accur,will close con:3
your cancel:None
[15:47:05] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:47:05] DEBUG con:2 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
[15:47:06] DEBUG create con:Token(3)
[15:47:06] DEBUG open new con:3
[15:47:06] DEBUG con:3 timeout reset
[15:47:10] WARN expire timeout accur,will close con:2
[15:47:10] WARN expire timeout accur,will close con:0
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:47:10] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:47:11] DEBUG create con:Token(0)
[15:47:11] DEBUG open new con:0
[15:47:11] DEBUG con:0 timeout reset
[15:47:12] DEBUG create con:Token(2)
[15:47:12] DEBUG open new con:2
[15:47:12] DEBUG con:2 timeout reset
[15:47:15] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:47:15] DEBUG con:0 timeout reset
[15:47:15] DEBUG con:2 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
[15:47:20] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:47:20] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:47:20] DEBUG con:2 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
[15:47:25] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:47:25] DEBUG con:0 timeout reset
[15:47:25] DEBUG con:2 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
[15:47:30] DEBUG con:1 timeout reset
[15:47:30] DEBUG con:0 timeout reset
[15:47:30] DEBUG con:2 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
[15:47:35] WARN expire timeout accur,will close con:3
your cancel:Some(Timeout { connection: Token(3), event: Token(2) })
[15:47:35] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:47:35] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:47:35] DEBUG create con:Token(3)
[15:47:35] DEBUG open new con:3
[15:47:35] DEBUG con:3 timeout reset
[15:47:40] WARN expire timeout accur,will close con:0
[15:47:40] WARN expire timeout accur,will close con:2
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
[15:47:40] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:47:40] DEBUG create con:Token(2)
[15:47:40] DEBUG open new con:2
[15:47:40] DEBUG con:2 timeout reset
[15:47:41] DEBUG create con:Token(0)
[15:47:41] DEBUG open new con:0
[15:47:41] DEBUG con:0 timeout reset
[15:47:45] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:47:45] DEBUG con:2 timeout reset
[15:47:45] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:47:50] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:47:50] DEBUG con:2 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
[15:47:50] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:47:55] DEBUG con:2 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
[15:47:55] DEBUG con:0 timeout reset
[15:47:55] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:48:00] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:48:00] DEBUG con:2 timeout reset
[15:48:00] DEBUG con:0 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:48:05] WARN expire timeout accur,will close con:3
your cancel:Some(Timeout { connection: Token(3), event: Token(2) })
[15:48:05] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:48:05] DEBUG con:2 timeout reset
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
[15:48:06] DEBUG create con:Token(3)
[15:48:06] DEBUG open new con:3
[15:48:06] DEBUG con:3 timeout reset
[15:48:10] WARN expire timeout accur,will close con:2
[15:48:10] WARN expire timeout accur,will close con:0
your cancel:Some(Timeout { connection: Token(2), event: Token(2) })
your cancel:Some(Timeout { connection: Token(0), event: Token(2) })
[15:48:10] DEBUG con:1 timeout reset
your cancel:Some(Timeout { connection: Token(1), event: Token(2) })
[15:48:11] DEBUG create con:Token(0)

1,the con with Token(0) is create twice? 2,wrong expire first happen at [15:47:10] WARN expire timeout accur,will close con:2 [15:47:10] WARN expire timeout accur,will close con:0

zoumi avatar Nov 12 '18 14:11 zoumi

I think I've partially tracked this down, though there are certainly still some mysteries.

This is what seems to happen when it's working correctly:

  1. Normally every time a frame comes in, on_frame fires which adds a new timeout.
  2. Once the signal to create a new timeout goes out and gets processed, on_new_timeout is called informing us of the new timeout.
  3. on_new_timeout notices that this is a duplicate timeout, so it cancels the old timeout and stores the new one.

The bug happens when the frame is a Close:

  1. When a Close frame comes in (meaning the connection is about to close), on_frame fires which adds a new timeout.
  2. The signal gets through to create a new timeout, but seemingly everything gets shut down (because the connection is closing) before it has a chance to call on_new_timeout
  3. Because on_new_timeout is never called, two timeouts are floating out there active.
  4. on_close gets called and cancels the one timeout that we have a reference to, but the new timeout we just created in on_frame is still active.

Only adding a new timeout when the frame isn't a Close seems to fix it, something like this:

fn on_frame(&mut self, frame: ws::Frame) -> ws::Result<Option<ws::Frame>> {
    if frame.opcode() != ws::OpCode::Close {
        self.sender.timeout(30_000, EXPIRE)?;
    }
    DefaultHandler.on_frame(frame)
}

Seems like there's a deeper bug which causes on_new_timeout to never get called in this scenario, but at least this gets us back working again for now.

wintersieck avatar Nov 19 '18 22:11 wintersieck

Thanks for the great library! @ajwinter, your work-around does not work for me. I noticed, that it happens, when timeouts are scheduled very quickly. It always happens exactly at 501 scheduled timeouts, that have not been cancelled, yet. So it looks like the queue of max_connections * queue_size == 100 * 5 is full and leads to a dead-lock situation.

tikste avatar Mar 24 '19 01:03 tikste