cqrs icon indicating copy to clipboard operation
cqrs copied to clipboard

Forwarding events to message broker / outbox table

Open tqwewe opened this issue 3 years ago • 4 comments

Firstly, thanks for this library. What you guys are doing is so valuable!

I've followed the book guide and implemented the bank service with postgres persistent data.. my next question is: How should events be forwarded to an event broker for other services to subscribe to?

I'm aware of the outbox pattern approach, is there any recommended way I can accomplish this? One simple way I can think of is have a Query which forwards events. impl Query<BankAccount> for EventForwarder.

Thanks!

tqwewe avatar Oct 27 '21 16:10 tqwewe

My apologies for the late response. You're quite correct, a new Query is the correct place to put this.

This might look something like:

struct KafkaEventForwarder {
    client: KafkaProducer
}

#[async_trait]
impl Query<BankAccount> for KafkaEventForwarder {
    async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<BankAccount>]) {
        for event in events {
            let payload = serde_json::to_vec(&event.payload).expect("unable to serialized payload");
            let record = KafkaRecord::new()
                .with_key(aggregate_id)
                .with_payload(&payload);
            self.client.send(record).await;
        }
    }
}

serverlesstechnology avatar Jan 05 '22 15:01 serverlesstechnology

A small follow-up question to this. I adopted this pattern to send the message to SNS, on the receiving end it looks like this:

{
  "Type" : "Notification",
  "MessageId" : "f5458878-95d1-5aad-82d9-d3d5edf775d9",
  "SequenceNumber" : "10000000000000027000",
  "TopicArn" : "arn:aws:sns:eu-central-1:xxx:dev-events.fifo",
  "Message" : "{\"CustomerDepositedMoney\":{\"amount\":1000.0,\"balance\":9000.0}}",
  "Timestamp" : "2022-09-27T22:15:02.834Z",
  "UnsubscribeURL" : "https://sns.eu-central-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-central-1:xxx:dev-events.fifo:aab561d9-73b3-4cdf-b527-9cbfa357eb47"
}

I'm using this as a playground for the events:

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum BankAccountEvent {
    AccountOpened {
        account_id: String,
    },
    CustomerDepositedMoney {
        amount: f64,
        balance: f64,
    },
    CustomerWithdrewCash {
        amount: f64,
        balance: f64,
    },
    CustomerWroteCheck {
        check_number: String,
        amount: f64,
        balance: f64,
    },
}

But how do I convert the incoming JSON {"CustomerDepositedMoney":{"amount":1000.0,"balance":9000.0}} back into an event?

CumpsD avatar Sep 29 '22 09:09 CumpsD

To answer my own question: Seems serde can do this out of the box :)

CumpsD avatar Sep 29 '22 09:09 CumpsD

@CumpsD that's correct! I'm leaning on serde to do a lot of the heavy lifting throughout this crate.

davegarred avatar Sep 29 '22 12:09 davegarred