sea-orm icon indicating copy to clipboard operation
sea-orm copied to clipboard

Event stream

Open billy1624 opened this issue 2 years ago • 10 comments

PR Info

  • Closes https://github.com/SeaQL/sea-orm/issues/1123

New Features

  • [ ]

Breaking Changes

  • [ ]

Changes

  • [ ]

billy1624 avatar Nov 21 '22 12:11 billy1624

https://github.com/jonhoo/bus might be a good option

tyt2y3 avatar Jan 02 '23 14:01 tyt2y3

Hmmmm I think we need an async broadcast channel. How about https://crates.io/crates/async-broadcast?

billy1624 avatar Jan 27 '23 12:01 billy1624

Design choice:

  1. Representing model values in HashMap
#[derive(Debug, Clone)]
pub struct Event {
    pub entity_type_id: TypeId,
    pub action: EventAction,
    pub values: HashMap<String, Value>,
}
  1. Receiver side: filter by entity and then unpack the values
let mut tokio_receiver = db.set_event_stream(async_broadcast::broadcast(10));

while let Ok(event) = tokio_receiver.recv().await {
    // Filter by entity
    if event.of_entity::<cake::Entity>() {
        // Unpack the values
        if let Some(val) = event.values.get(cake::Column::Name.as_str()) {
            todo!()
        }
    }
}

billy1624 avatar Jan 27 '23 15:01 billy1624

Hmmmm I think we need an async broadcast channel. How about https://crates.io/crates/async-broadcast?

use async_broadcast::{broadcast, TryRecvError, Receiver};
use futures_lite::{future::block_on, stream::StreamExt};
 
 
fn main() {
    block_on(async move {
        let (s1, mut r1) = broadcast(2);
        let s2 = s1.clone();
        let mut r2 = r1.clone();
    
        // Send 2 messages from two different senders.
        s1.broadcast(7).await.unwrap();
        s2.broadcast(8).await.unwrap();
    
        // Channel is now at capacity so sending more messages will result in an error.
        assert!(s2.try_broadcast(9).unwrap_err().is_full());
        assert!(s1.try_broadcast(10).unwrap_err().is_full());
    
        // We can use `recv` method of the `Stream` implementation to receive messages.
        assert_eq!(r1.next().await.unwrap(), 7);
        assert_eq!(r1.recv().await.unwrap(), 8);
        assert_eq!(r2.next().await.unwrap(), 7);
        assert_eq!(r2.recv().await.unwrap(), 8);
    
        // All receiver got all messages so channel is now empty.
        assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
        assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
    
        // Close one receiver, which closes all the sender and receivers.
        Receiver::close(&r1);
 
        println!("{}", s1.is_closed()); // prints True
 
        s1.broadcast(10).await.unwrap(); // thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError(..)', src/main.rs:34:32
        s2.broadcast(11).await.unwrap();
 
        println!("{}", r2.next().await.unwrap());
        println!("{}", r2.recv().await.unwrap());
 
        // println!("{}", r1.next().await.unwrap());
        // println!("{}", r1.recv().await.unwrap());
        
    })
}

There are 2 problems with this library

  1. when one the receiver is closed, it closed the channel, Hence no other receiver will get any event and sender wont be able to send event updates. Check after line 30.
  2. broadcast caches the messages ( number of cached messages can be set to 1 broadcast(cap = 2); ). tyt2y3 mentioned https://github.com/SeaQL/sea-orm/issues/1123#issue-1410534966 that Notes 1: We should not buffer events. When no one is subscribing, we will discard all events.. So future subscribers can get past event notifications.

--------------EDITED--------------

Verifying if a subscriber get's past events.

Cargo.toml futures = { version = "0.3", members=["futures-executor"] } matches with the dev-dependencies in sea-orm but not with the dependencies.

use async_broadcast::{broadcast};
use futures::executor::block_on;

fn main() {
    block_on(async move {
        let (s1, mut r1) = broadcast(2);
        let s2 = s1.clone();
        
        // Event 7 should be received by only r1.
        s1.broadcast(7).await.unwrap();

        // new subscriber after event 7.
        let mut r2 = r1.clone();
        
        // event 8 should be received by both r1 and r2
        s2.broadcast(8).await.unwrap();

        // We can use `recv` method to receive messages.
        assert_eq!(r1.recv().await.unwrap(), 7);
        assert_eq!(r1.recv().await.unwrap(), 8);
        
        // r2 should receive events 8 and onwards.
        assert_eq!(r2.recv().await.unwrap(), 8); // this panics, received event is 7 which is past event for r2, it should be 8
    })
}

Future subscribers receive past event messages. Maybe all the receivers are reading from the same queue with their own index pointer.

Diwakar-Gupta avatar Jan 30 '23 11:01 Diwakar-Gupta

@Diwakar-Gupta thank you for the investigation. Your example is easy to read!

Close one receiver, which closes all the sender and receivers.

According to the docs, this is not the intended usage:

When all Senders or all Receivers are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent, but remaining messages can still be received.

Note that, dropping the Receiver is a different operation from manually closing it.

So future subscribers can get past event notifications.

Yes, I think this is a problem, although the example did not make it clear: we should clone r2 at the later point and verify the behaviour. Otherwise, I think it will be fine, because if the channel is closed, we will drop the Sender as well and do nothing.

tyt2y3 avatar Jan 31 '23 06:01 tyt2y3

Are you also interested in investigating the bus crate?

Apart from semantics, an additional requirement is to be compatible with all async runtime we support.

tyt2y3 avatar Jan 31 '23 06:01 tyt2y3

Hi @tyt2y3 I did some work with bus. About async-runtime I don't think bus uses async rather they depend on os-threads,. which is different from sea async support. Check this file https://github.com/jonhoo/bus/blob/main/src/lib.rs

Cargo.toml bus = "2.3.0"

fn main() {
    use bus::Bus;
    let mut tx = Bus::new(2); // len = 2
    let mut rx1 = tx.add_rx();
    
    tx.broadcast(1);

    let mut rx2 = tx.add_rx(); // should not receive 1
    tx.broadcast(2);
    
    assert_eq!(rx1.recv(), Ok(1));
    assert_eq!(rx1.recv(), Ok(2));

    // rx2 don't receive past events
    assert_eq!(rx2.recv(), Ok(2));

    drop(rx2);

    tx.broadcast(3);
    assert_eq!(rx1.recv(), Ok(3));

    // send more events than len and don't consume value
    let mut rx3 = tx.add_rx();
    tx.broadcast(4);
    tx.broadcast(5);
    assert_eq!(tx.try_broadcast(6), Err(6));

    // 1 receiver consume's value 
    assert_eq!(rx3.recv(), Ok(4));
    assert_eq!(tx.try_broadcast(6), Err(6));

    // both receiver consumes value
    assert_eq!(rx1.recv(), Ok(4));
    assert_eq!(tx.try_broadcast(6), Ok(()));
}

My observation

  1. broadcast works perfectly fine with multiple receivers
  2. past event is not send to receiver.
  3. Drop works fine.
  4. when Bus::new(len = 2); len number of messages are send but not consumed yet broadcast will wait till eternity for all the receivers to consume at least one event value. In the official doc they have mentioned to have larger len. read fn new https://docs.rs/bus/2.3.0/bus/struct.Bus.html

One other think I wanted to ask should we consider the way javascript web handles event, it takes a function ( function or object in our case ) onclick(function) and calls it when event occurs.

Diwakar-Gupta avatar Jan 31 '23 10:01 Diwakar-Gupta

Hey @Diwakar-Gupta, thanks for the experiments!! I didn't have time to do it myself just yet. So, glad you helped :))

  1. when one the receiver is closed, it closed the channel, Hence no other receiver will get any event and sender wont be able to send event updates. Check after line 30.

One shouldn't invoke Receiver::close() method. A Receiver will be dropped when it goes out of scope as defined in impl Drop for Receiver<T>. We want the sender keep alive when there is no receiver but I'm not sure the behaviour of it.

  1. broadcast caches the messages ( number of cached messages can be set to 1 broadcast(cap = 2); ). tyt2y3 mentioned [PIP] Event stream #1123 (comment) that Notes 1: We should not buffer events. When no one is subscribing, we will discard all events.. So future subscribers can get past event notifications.

I found that we cannot spawn a new receiver simply by calling .clone(). Use new_receiver() instead.

Also, can we try .set_await_active() to false? It's true by default. With that set, hopefully it won't "cache" past message (event).

billy1624 avatar Jan 31 '23 11:01 billy1624

Some references:

  • https://docs.rs/async-broadcast/0.5.0/async_broadcast/index.html#difference-with-other-broadcast-crates
  • https://tokio.rs/tokio/tutorial/channels#tokios-channel-primitives

billy1624 avatar Jan 31 '23 11:01 billy1624

@billy1624 @tyt2y3 Have a look at crossbeam-channel it provides mpmc feature and is also merged in rust's standard library here is pr . This can be a good option. This is added in rust version 1.67 https://github.com/rust-lang/rust/releases/tag/1.67.0.

Diwakar-Gupta avatar Feb 03 '23 07:02 Diwakar-Gupta