tokio
tokio copied to clipboard
Add `Stream` mock to `tokio-test`
Is your feature request related to a problem? Please describe.
I want to test streams wrapped in tokio_serde, but that's not currently possible.
Describe the solution you'd like
Make Action polymorphic in the transport type. That way, I can construct a Builder<Framed<...>> or a Builder<&[u8]> depending on my needs.
The mock type in tokio-test is written for the AsyncRead/AsyncWrite traits, not the Stream trait.
Ah, good point. Perhaps this is not the right place for such a feature request, then?
I don't mind adding some sort of Stream mock, but it would be a new type.
Sounds perfect.
Though I guess it would need to be written for the Stream and Sink traits, right?
You could do both. I'm also ok with something that starts with just Stream.
I'm interested in taking this on. So that I understand: the desired behavior for the mock Stream is the ability to dynamically insert Items during the test? Otherwise you could just use something like Stream::iter?
As for mocking a Sink , it seems like we'd need to track all values inserted with start_send (via a Vec?), and some way to set the next Poll state for associated poll_ methods?
It also seems like we might want sequence-building functionality like the current io::Builder has?
A rough example test snippet could be helpful here.
Right, we need it to do more than just Stream::iter if we want it to be useful, so it probably needs both the Stream and Sink impl.
A minimal builder would probably look like this, mirroring the existing builder.
impl Builder<T, U, E> {
fn next(&mut self, result: T);
fn send_ok(&mut self, item: U);
fn send_err(&mut self, error: E);
fn wait(&mut self, duration: Duration);
}
Of course, if you have better ideas, that's fine too.
The best example I can think of where this could be used is for e.g. testing code that communicates over a web socket. Such code would probably split it into two halves with StreamExt::split and have one task constantly read from it while another is writing. So we would need it to not panic if a read happens during a write, instead just return Poll::Pending until the write we want happens.
Digging into this some more, it looks like both Stream and Sink have some runtime requirements that should also be enforced:
- non-fused
Streamobjects have undefined behavior whennextis called after the stream closes, so we should probably panic if this happens in a test. - the various
poll_methods inSinkhave requirements about call order that should also be enforced. e.g.poll_readymust be called before each call tostart_send. It's strongly implied that not doing so is undefined behavior, so my gut reaction is we should attempt to detect violations of this rule too and panic if we find them.
As you mentioned in the previous comment, there's some awkwardness around the fact that Stream and Sink are theoretically decoupled. I'm toying with the idea of having separate action queues for each side, but still need to think through dependency use cases there (e.g. request/response protocols). As tokio_serde was the impetus for this change, I'm trying to imagine what tests might look like for its tiny example.
I think it would probably best to tie the Stream and Sink together instead of decoupling them.
Just to be clear, none of the things you mentioned can cause undefined behavior. That has a very specific meaning in Rust-land, which e.g. the poll_next documentation also points out:
However, as the
poll_nextmethod is not marked unsafe, Rust’s usual rules apply: calls must never cause undefined behavior (memory corruption, incorrect use of unsafe functions, or the like), regardless of the stream’s state.
The word for this kind of thing is unspecified behavior.
Ah, that makes sense. Thanks for the clarification. I have the Stream half ready. The Sink side is significantly more complicated, but I think I should be able to complete that in the next few days.
I'm puzzling out some additional behavior for waking, which is a bit more complex with a two-sided communication channel.
In particular, the documentation for .poll() on the Future trait specifically states:
on multiple calls to poll, only the Waker from the Context passed to the most recent call should be scheduled to receive a wakeup.
There is no such guidance for the poll methods on the Sink trait. I'm currently proceeding under the assumption that all calls to poll should replace a single waker stored in the sink state, similar to how poll works in Future. If that needs to be adjusted (e.g. different wakers per poll method), or if there is a better way to figure this out, let me know.
In general, multiple calls to any poll method should only wake the most recent waker. This includes sinks.
Ok, one final question: is .wait() necessary? Couldn't it be defined directly in the test by spawning a task that does a sleep and then sequences the next operation via the provided Handle?
Is the function useful enough that it should just be included anyway?
And finally, if it is necessary, I'm not seeing any tests for similar functionality in io. Is it ok to punt on that for now, or should we use something like mock_instant to create one?
I believe .wait() is useful because without it you don't really get any Poll::Pending return values in other situations.
Regarding tests that use time, you should not do that using mock_instant. The main Tokio crate already has utilities for mocking the tokio::time::Instant type.
Is this still open ? and is this a good begginer issue ?
It is indeed still open. You are welcome to give it a try!
Awesome! from where could I start ? I was reading this thread and i think i got it like 50% so i'm leaving here what i got it as my to-do list.
-
[ ] I would start creating a test function in tokio-stream/tests/stream_
.rs or like tokio/tests/io_write.rs -
[ ] Create a builder like there was mentioned in this thread for the stream or use tokio stream like this
tokio_stream::{self, StreamExt};
tokio stream
impl Builder<T, U, E> {
fn next(&mut self, result: T);
fn send_ok(&mut self, item: U);
fn send_err(&mut self, error: E);
fn wait(&mut self, duration: Duration);
}
mentioned builder
- [ ] Then add a vec of values for the execution of the stream
- [ ] assert if the values are there for the execution time like is done in stream_timeout.rs ?
I believe that the implementation would be very similar to the mock in tokio_test/io.rs. I would start by reading that for inspiration.
Okay, I think I'm getting somewere. How can i create a stream from a Mock type since its not iterable?
use futures_util::stream; // why not the use tokio_stream::{self as stream, StreamExt}; ?
/* .... */
#[tokio::test]
async fn stream_read(){
let mut mock: Stream = stream:: something_here ::from(Builder::new().read(b"hello ").read(b"world!").build());
let res: Poll<Option<...>::item> = mock.poll_next(); // here i'm trying to figure out how to get the read values
assert_eq!(res, b"hello ");
The Mock type itself should be a Stream. No conversion should be necessary.
@Grubba27, are you still working on this? If not, I would like to take it over.