mux-stream icon indicating copy to clipboard operation
mux-stream copied to clipboard

(De)multiplex asynchronous streams

mux-stream

Continious integration Crates.io Docs.rs

This crate empahises the first-class nature of asynchronous streams in Rust by deriving the value construction & pattern matching operations from ADTs, depicted by the following correspondence:

ADTs Streams
Value construction Multiplexing
Pattern matching Demultiplexing

Table of contents

  • Installation
  • Motivation
  • Demultiplexing
  • Multiplexing
  • FAQ

Installation

Copy this into your Cargo.toml:

[dependencies]
mux-stream = "0.3"
tokio = "1.6"
tokio-stream = "0.1"
futures = "0.3"

Motivation

In many problem domains, we encounter the need to process incoming hierarchical structures. Suppose you're writing a social network, and the following kinds of updates might come at any moment:


In terms of Rust, you might want to express such updates via sum types:

enum UserReq {
    SendMsg(SendMsgReq),
    Follow(FollowReq),
    MuteFriend(MuteFriendReq)
}

enum SendMsgReq {
    Photo(...),
    Video(...),
    Text(...)
}

struct FollowReq {
    ...
}

enum MuteFriendReq {
    Forever(...),
    ForInterval(...)
}

This is where the story begins: now you need to process user requests. Let's formulate some general requirements of requests-processing code:

  • Conciseness. Avoid boilerplate where possible. Life is too short to write boilerplate code.
  • Single-responsibility principle (SRP). For our needs it means that each processor must be responsible for exactly one kind of request. No less and no more.
  • Compatible with other Rusty code. Our requests-processing solution must be able to be easily integrated into existing code bases.
  • Stay Rusty. eDSLs implemented via macros are fun, but be ready for confusing compilation errors when business logic is expressed in terms of such eDSLs. What is more, they are computer languages on their own -- it takes some time to become familiar with them.
  • Type safety. Do not spread the pain of upcasting/downcasting types you're already aware of.

This crate addresses all of the aforementioned requirements. The approach is based upon functional asynchronous dataflow programming: we augment asynchronous data streams with pattern matching. Your code would reflect the following structure (concerning with the example of a social network):


(Note the similarities with the chain-of-responsibility pattern.)

That is, each function takes a stream of updates and propagates (demultiplexes, pattern matches) them into processors of lower layers, and hence addressing the single-responsibility principle. What is more, you're able to use all the power of stream adaptors, which let you deal with updates as with chains, not as with single objects, declaratively.

The sections below are dedicated to demultiplexing and multiplexing separately. See also examples/admin_panel.rs, an elaborated demonstration of the most prominent aspects of the paradigm.

Demultiplexing

Given Stream<T1 | ... | Tn>, demultiplexing produces Stream<T1>, ..., Stream<Tn>. See the illustration below, in which every circle is an item of a stream and has a type (its colour):

That is, once an update from an input stream is available, it's pushed into the corresponding output stream in a separate Tokio task. No output stream can slow down another one.

Example

[examples/demux.rs]

use mux_stream::{demux, error_handler};

use futures::StreamExt;
use tokio_stream::wrappers::UnboundedReceiverStream;

#[tokio::main]
async fn main() {
    #[derive(Debug)]
    enum MyEnum {
        A(i32),
        B(f64),
        C(&'static str),
    }

    let stream = tokio_stream::iter(vec![
        MyEnum::A(123),
        MyEnum::B(24.241),
        MyEnum::C("Hello"),
        MyEnum::C("ABC"),
        MyEnum::A(811),
    ]);

    let (i32_stream, f64_stream, str_stream) =
        demux!(MyEnum { A, B, C })(stream, error_handler::panicking());

    let mut i32_stream = UnboundedReceiverStream::new(i32_stream);
    let mut f64_stream = UnboundedReceiverStream::new(f64_stream);
    let mut str_stream = UnboundedReceiverStream::new(str_stream);

    assert_eq!(i32_stream.next().await, Some(123));
    assert_eq!(i32_stream.next().await, Some(811));
    assert_eq!(i32_stream.next().await, None);

    assert_eq!(f64_stream.next().await, Some(24.241));
    assert_eq!(f64_stream.next().await, None);

    assert_eq!(str_stream.next().await, Some("Hello"));
    assert_eq!(str_stream.next().await, Some("ABC"));
    assert_eq!(str_stream.next().await, None);
}

Multiplexing

Multiplexing is the opposite of demultiplexing: given Stream<T1>, ..., Stream<Tn>, it produces Stream<T1 | ... | Tn>. Again, the process is illustrated below:

That is, once an update from any input streams is available, it's pushed into the output stream. Again, this work is performed asynchronously in a separate Tokio task.

Example

[examples/mux.rs]

use mux_stream::{error_handler, mux};

use std::{collections::HashSet, iter::FromIterator};

use futures::StreamExt;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_stream::wrappers::UnboundedReceiverStream;

#[derive(Debug)]
enum MyEnum {
    A(i32),
    B(u8),
    C(&'static str),
}

#[tokio::main]
async fn main() {
    let i32_values = HashSet::from_iter(vec![123, 811]);
    let u8_values = HashSet::from_iter(vec![88]);
    let str_values = HashSet::from_iter(vec!["Hello", "ABC"]);

    let result: UnboundedReceiver<MyEnum> = mux!(MyEnum { A, B, C })(
        tokio_stream::iter(i32_values.clone()),
        tokio_stream::iter(u8_values.clone()),
        tokio_stream::iter(str_values.clone()),
        error_handler::panicking(),
    );

    let (i32_results, u8_results, str_results) = UnboundedReceiverStream::new(result)
        .fold(
            (HashSet::new(), HashSet::new(), HashSet::new()),
            |(mut i32_results, mut u8_results, mut str_results), update| async move {
                match update {
                    MyEnum::A(x) => i32_results.insert(x),
                    MyEnum::B(x) => u8_results.insert(x),
                    MyEnum::C(x) => str_results.insert(x),
                };

                (i32_results, u8_results, str_results)
            },
        )
        .await;

    assert_eq!(i32_results, i32_values);
    assert_eq!(u8_results, u8_values);
    assert_eq!(str_results, str_values);
}

Hash sets are used here owing to the obvious absence of order preservation of updates from input streams.

FAQ

Q: Is only Tokio supported now?

A: Yes. I have no plans yet to support other asynchronous runtimes.