callbag-rs icon indicating copy to clipboard operation
callbag-rs copied to clipboard

Rust implementation of the callbag spec for reactive/iterable programming

callbag-rs

Rust implementation of the callbag spec for reactive/iterable programming.

Basic callbag factories and operators to get started with.

Highlights:

  • Supports reactive stream programming
  • Supports iterable programming (also!)
  • Same operator works for both of the above
  • Extensible

Imagine a hybrid between an Observable and an (Async)Iterable, that's what callbags are all about. It's all done with a few simple callbacks, following the callbag spec.

CI Crates.io Documentation MIT OR Apache-2.0 licensed

Examples

Reactive programming examples

Pick the first 5 odd numbers from a clock that ticks every second, then start observing them:

use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};

use callbag::{filter, for_each, interval, map, pipe, take};

let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);

let actual = Arc::new(SegQueue::new());

pipe!(
    interval(Duration::from_millis(1_000), nursery.clone()),
    map(|x| x + 1),
    filter(|x| x % 2 == 1),
    take(5),
    for_each({
        let actual = Arc::clone(&actual);
        move |x| {
            println!("{x}");
            actual.push(x);
        }
    }),
);

drop(nursery);
async_std::task::block_on(nursery_out);

assert_eq!(
    &{
        let mut v = vec![];
        while let Some(x) = actual.pop() {
            v.push(x);
        }
        v
    }[..],
    [1, 3, 5, 7, 9]
);

Iterable programming examples

From a range of numbers, pick 5 of them and divide them by 4, then start pulling those one by one:

use crossbeam_queue::SegQueue;
use std::sync::Arc;

use callbag::{for_each, from_iter, map, pipe, take};

#[derive(Clone)]
struct Range {
    i: usize,
    to: usize,
}

impl Range {
    fn new(from: usize, to: usize) -> Self {
        Range { i: from, to }
    }
}

impl Iterator for Range {
    type Item = usize;

    fn next(&mut self) -> Option<Self::Item> {
        let i = self.i;
        if i <= self.to {
            self.i += 1;
            Some(i)
        } else {
            None
        }
    }
}

let actual = Arc::new(SegQueue::new());

pipe!(
    from_iter(Range::new(40, 99)),
    take(5),
    map(|x| x as f64 / 4.0),
    for_each({
        let actual = Arc::clone(&actual);
        move |x| {
            println!("{x}");
            actual.push(x);
        }
    }),
);

assert_eq!(
    &{
        let mut v = vec![];
        while let Some(x) = actual.pop() {
            v.push(x);
        }
        v
    }[..],
    [10.0, 10.25, 10.5, 10.75, 11.0]
);

Ok::<(), Box<dyn std::error::Error>>(())

API

The list below shows what's included.

Source factories

Sink factories

Transformation operators

Filtering operators

Combination operators

Utilities

Terminology

  • source: a callbag that delivers data
  • sink: a callbag that receives data
  • puller sink: a sink that actively requests data from the source
  • pullable source: a source that delivers data only on demand (on receiving a request)
  • listener sink: a sink that passively receives data from the source
  • listenable source: source which sends data to the sink without waiting for requests
  • operator: a callbag based on another callbag which applies some operation

License

Licensed under either of

  • Apache License, Version 2.0, (LICENSE-APACHE or https://www.apache.org/licenses/LICENSE-2.0)
  • MIT license (LICENSE-MIT or https://opensource.org/licenses/MIT)

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Acknowledgements

Thanks to André Staltz (@staltz) for creating the callbag spec.

This library is a port of https://github.com/staltz/callbag-basics. Some inspiration was taken from https://github.com/f5io/callbag.rs.

Many thanks to the awesome folks on the Rust Users Forum for their help, especially: