tokio-core icon indicating copy to clipboard operation
tokio-core copied to clipboard

Allow try_clone on TcpStream

Open stackjohn opened this issue 7 years ago • 7 comments

Is it possible to implement the try_clone method for the tokio TcpStream type?

I'm using the echo server example, eventually I am hoping to be able to iterate through all the currently connected sockets and broadcast a message to them one by one.

Lets say I have the following code:

   // Bind the server's socket
    let addr = "127.0.0.1:12345".parse().unwrap();
    let tcp = TcpListener::bind(&addr, &handle).unwrap();
    
    let mut connections = Arc::new((Mutex::new(Vec::new())));
   
    let conn = connections.clone(); 
    thread::spawn(move || {
        let mut i;
        loop {
            i = conn.lock().unwrap().len();
            //Here I am hoping to eventually be able to write to each one of the sockets in the vector. 
            println!("There are {} connections", i);
             stdout().flush();
            thread::sleep_ms(10000);
        }
    });

    // Iterate incoming connections
    let server = tcp.incoming().for_each(|(tcp, _)| {

        let connection = Arc::new(Mutex::new(tcp));   
 
        connections.lock().unwrap().push(connection.lock().try_clone());
        
        // Split up the read and write halves
        let (reader, writer) = connection.lock().unwrap().split();

        // Future of the copy
        let bytes_copied = io::copy(reader, writer);

        // ... after which we'll print what happened
        let handle_conn = bytes_copied.map(|(n, _, _)| {
            println!("wrote {} bytes", n)
        }).map_err(|err| {
            println!("IO error {:?}", err)
        });

        // Spawn the future as a concurrent task
        handle.spawn(handle_conn);

        Ok(())
    });

The code fails to compile as try_clone is not implemented.

        connections.lock().unwrap().push(connection.lock().try_clone());

It appears that TcpStream in the standard library does allow the cloning of TcpStream objects.

Is there a way to work around this? Apologies I am very new to Rust, so what I am trying to do might not make any sense at all.

stackjohn avatar Apr 17 '17 19:04 stackjohn

Thanks for the report! Unfortunately this is a pretty difficult notion on the backend, however, as the semantics of duplicate file descriptors and epoll notifications tends to get... interesting. (much less the Windows semantics as well). I don't think it's impossible but I don't think it's easy either, and we currently haven't put a huge amount of thought into how to do this.

For now could you use something like Arc<TcpStream>? That way you can read/write and still clone it to get multiple references.

alexcrichton avatar Apr 19 '17 01:04 alexcrichton

Thanks for your reply, much appreciated. I will take a look at Arc<TcpStream>.

I see there is a good example here.

stackjohn avatar Apr 19 '17 07:04 stackjohn

Relates to https://github.com/carllerche/mio/issues/361

carllerche avatar Apr 19 '17 14:04 carllerche

A few notes…

Arc<TcpStream> is not usable for reader/writer clones passed to concurrent async blocks.

TcpStream::split() is not usable either because reader and writer are not independent and cannot be both moved each to its own async block;

The use case of the standard TcpStream::try_clone() is to create two completely independent channels (the dependency is moved to the operating system internals) for reading and writing. The provided example doesn't perform concurrent operation.

However, there is TcpStream::into_split() (that possibly didn't exist when @stackjohn asked) that does the job. It returns a reader/writer pair that can be moved and split into two separate tasks. It looks like it's not an OS-level solution based on dup() but rather the TcpStream seems to be moved to a dynamically allocated memory and shared by the reader/writer pair.

pavlix avatar Sep 18 '21 01:09 pavlix

You can create a tokio broadcast channel and subscribe to it in the TcpStream handler fn, then when you get new messages you send them back to the TcpStream, no need to clone it.

ramiroaisen avatar Oct 11 '22 19:10 ramiroaisen

Another good option is to convert the tokio TcpStream into std TcpStream using TcpStream::into_std() then clone the std TcpStream using it's TcpStream::try_clone() method.

Then you can eventually do TcpStream::from_std

// 
let std_tcp_stream = tokio_tcp_stream.into_std()?;
let std_tcp_stream_clone_one =  std_tcp_stream.try_clone()?;
let std_tcp_stream_clone_two = std_tcp_stream.try_clone()?;

// Both pointing to the same file descriptors
let new_tokio_tcp_stream _one= TcpStream::from_std(std_tcp_stream_clone_one)?;
let new_tokio_tcp_stream _two= TcpStream::from_std(std_tcp_stream_clone_two)?;

SofwanCoder avatar Mar 15 '24 19:03 SofwanCoder

Another good option is to convert the tokio TcpStream into std TcpStream using TcpStream::into_std() then clone the std TcpStream using it's TcpStream::try_clone() method.

Then you can eventually do TcpStream::from_std

// 
let std_tcp_stream = tokio_tcp_stream.into_std()?;
let std_tcp_stream_clone_one =  std_tcp_stream.try_clone()?;
let std_tcp_stream_clone_two = std_tcp_stream.try_clone()?;

// Both pointing to the same file descriptors
let new_tokio_tcp_stream _one= TcpStream::from_std(std_tcp_stream_clone_one)?;
let new_tokio_tcp_stream _two= TcpStream::from_std(std_tcp_stream_clone_two)?;

This method takes the ownership of the origin tokio's tcpstream object, which can cause problem somewhere (for example, the stream itself is a field of another struct)

QyInvoLing avatar Mar 25 '24 16:03 QyInvoLing