sea-orm icon indicating copy to clipboard operation
sea-orm copied to clipboard

[PIP] Event stream

Open tyt2y3 opened this issue 3 years ago • 12 comments

No pun intended, PIP stands for "Proposal & Implementation Plan".

Motivation

To generate a high level event stream of changes to Entities, and serve as a foundation to change capture and other event-based system paradigm.

Architecture

When this flag is enabled, DatabaseConnection will maintain a broadcast channel (is there non-tokio equivalent?) internally, and users will be able to subscribe to this channel (via subscribe(&self) -> Receiver<Event>.

If execution succeeded, Inserter, Updater & Deleter will extract some semantic information from the query and generate an Event, and push to the channel.

Information we can extract and represent:

  1. New Model: the newly inserted ID and content of the new Model
  2. Update Model: ID of the model, which fields are changed and the updated values
  3. Delete Model: ID of the deleted model, and perhaps content of the deleted Model

Notes 1: We should not buffer events. When no one is subscribing, we will discard all events. Notes 2: I imagine a subscriber can persist these event, may be even saving to the same database. To prevent infinite echo, we should have an option to ignore certain tables.

Breaking changes

It seems that we need to convert DatabaseConnection from a enum to a struct.

Open questions

  1. How to allow custom SQL execution to generate events
  2. How to allow custom processors to handle InsertStatement and UpdateStatement

tyt2y3 avatar Oct 16 '22 16:10 tyt2y3

Question: since broadcast channel is generic, what will be the type? It could be dyn ModelTrait, but is it possible to determine which model it actually is?

teenjuna avatar Oct 25 '22 01:10 teenjuna

I think async-channel would be a good alternative.

billy1624 avatar Nov 21 '22 12:11 billy1624

Hey @teenjuna, I think the generic is on the message channel but not the message itself. You can check https://github.com/SeaQL/sea-orm/pull/1239 and see how I create broadcast channel on any supported message channel. More specifically, the EventStream trait.

billy1624 avatar Nov 21 '22 12:11 billy1624

Would this be for all updates occurring in the database or per table? Being able to subscribe to updates only in one table would solve the issue @teenjuna mentioned, and also make handling the messages easier (assuming one is only interested in updates from a single table).

netthier avatar Jan 23 '23 12:01 netthier

I think a downstream component would be responsible for filtering and forwarding messages to interested receivers so here the design should focus on the source of truth.

tyt2y3 avatar Jan 23 '23 15:01 tyt2y3

Will it work with delete events caused by "cascade delete"? So if I delete an entity and related entities are deleted automatically by sea orm, can I get the deleted related entities? This would be useful for example for cloud applications (like e.g. nextcloud): If a user is deleted, all file entities are deleted as well, but the actual files stored in the normal FS on the host need to be deleted as well.

TheCataliasTNT2k avatar Jan 25 '23 15:01 TheCataliasTNT2k

Will it work with delete events caused by "cascade delete"?

I think not. This is a feature inside the database, and if you used it, it is meant to be transparent to the application.

tyt2y3 avatar Jan 26 '23 07:01 tyt2y3

There is a fundamental design choice we have to make. How can the event subscriber be able to decode the event? For example, this CRUD event belongs to which entity and what has been changed?

I think we need a trimmed ModelTrait where only basic getters are kept.

struct ModelEvent {
    table_name: DynIden,
    action_type: ModelEventType,
    model: Box<dyn BasicModelTrait>,
}

enum ModelEventType {
    Insert,
    Update,
    Delete,
}

trait BasicModelTrait {
    /// Get the value of a column.
    /// 
    /// Here the `col` is not bounded by `<Self::Entity as EntityTrait>::Column`
    fn get(&self, col: DynIden) -> Value;
}

The original ModelTrait can't be used as Box<dyn ModelTrait>, it can only be defined as Box<dyn ModelTrait<Entity = T>> hence limiting the ModelEvent to represent an event of a specific entity not any entities in general.

#[async_trait]
pub trait ModelTrait: Clone + Send + Debug + BasicModelTrait { // All `ModelTrait` should implements `BasicModelTrait`
    // Need to provide generic / concrete type for this associated type
    type Entity: EntityTrait;

    // ...
}

billy1624 avatar Jan 26 '23 11:01 billy1624

Will it be possible to implement a generic TryFrom to convert BasicModelTrait into a concrete model type? Asking from a perspective of good DX.

teenjuna avatar Jan 26 '23 16:01 teenjuna

Okay. This won't work... the data transported via broadcast channel has to be Clone.

// Has to be `Clone`
trait BasicModelTrait: Clone { ... }

struct ModelEvent {
    table_name: DynIden,
    action_type: ModelEventType,
    model: Box<dyn BasicModelTrait>, // Violation of object safety!
}

billy1624 avatar Jan 27 '23 13:01 billy1624

Will it be possible to implement a generic TryFrom to convert BasicModelTrait into a concrete model type? Asking from a perspective of good DX.

Yes, it could!

billy1624 avatar Jan 27 '23 13:01 billy1624

Okay. This won't work... the data transported via broadcast channel has to be Clone.


// Has to be `Clone`

trait BasicModelTrait: Clone { ... }



struct ModelEvent {

    table_name: DynIden,

    action_type: ModelEventType,

    model: Box<dyn BasicModelTrait>, // Violation of object safety!

}

Maybe this crate can be used? https://github.com/dtolnay/dyn-clone

teenjuna avatar Jan 27 '23 13:01 teenjuna