crossbeam icon indicating copy to clipboard operation
crossbeam copied to clipboard

question - can i filter messages such that they will arrive to certain recipients only and how can receivers be async without blocking their current thread

Open avnerbarr opened this issue 3 years ago • 0 comments

I want to create a "Notification center" to facilitate decoupled message passing between parts of my application i.e. similar to https://developer.apple.com/documentation/foundation/nsnotificationcenter

I couldn't find an existing solution (maybe you know of one) so started to implement on my own and hit some challenges.

mainly,

the receiver should be async and currently I am spawning threads like crazy and moving the receiver for one of messages in many cases.

the second issue, I'm creating a lot of channels when I could be multiplexing instead on a shared channel.

the issue though is how to "divert"/"filter" the messages to the correct recipients.

Here is some play code I have in place which partially works but suffers from the caveats I mentioned above

lazy_static! {
static ref NOTIFICATION_CALLBACKS: Mutex<HashMap<String, SenderReceiverPair>> =
        Mutex::new(HashMap::new());
}

type SenderReceiverPair = (
    crossbeam::channel::Sender<NotificationPayload>,
    crossbeam::channel::Receiver<NotificationPayload>,
);

#[derive(Clone, Debug)]
pub struct NotificationPayload {
    pub notification_id: String,
    pub data: Option<serde_json::Value>,
}


pub fn register(
    notification_id: String,
) -> crossbeam::channel::Receiver<NotificationPayload> {
    if let Some((_, rec)) = NOTIFICATION_CALLBACKS
        .lock()
        .unwrap()
        .get(notification_id.as_str())
    {
        return rec.clone(); // return a clone of an existing receiver
    }

    let (sender, reciever) = crossbeam::channel::unbounded::<NotificationPayload>();
    NOTIFICATION_CALLBACKS
        .lock()
        .unwrap()
        .insert(notification_id, (sender, reciever.clone())); // re-use the channel if another observer wants to register
    reciever
}

pub fn post_notification(notification_id: &str, data: Option<serde_json::Value>) {
    if let Some((sender, _)) = NOTIFICATION_CALLBACKS.lock().unwrap().get(notification_id) {
        let _ = sender.send(NotificationPayload {
            notification_id: notification_id.to_string(),
            data,
        });
    }
}



// example usage

fn do_something() {
        // register for some notification non-blocking style
        let receiver = notifications::register(some_id);
        std::thread::spawn(move || {
            for res in receiver.iter() {
                info!("got notified for {:?}", res);
            }
// how to kill the reciever ? how to kill the thread? do I really need this one off thread?
        });

   // continue with more work
...
}


// somewhere else posting notification

fn some_other_module_far_away() {
   notifications::post(some_id, some_json_payload_with_is_used_by_recievers);
}
    

What I want (pseudo)

notifications::register(some_notification_name, some_callback);

notifications::post(some_notification_name, some_notifications_data);

avnerbarr avatar Jul 28 '21 08:07 avnerbarr