cqrs
cqrs copied to clipboard
Forwarding events to message broker / outbox table
Firstly, thanks for this library. What you guys are doing is so valuable!
I've followed the book guide and implemented the bank service with postgres persistent data.. my next question is: How should events be forwarded to an event broker for other services to subscribe to?
I'm aware of the outbox pattern approach, is there any recommended way I can accomplish this?
One simple way I can think of is have a Query
which forwards events. impl Query<BankAccount> for EventForwarder
.
Thanks!
My apologies for the late response. You're quite correct, a new Query
is the correct place to put this.
This might look something like:
struct KafkaEventForwarder {
client: KafkaProducer
}
#[async_trait]
impl Query<BankAccount> for KafkaEventForwarder {
async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<BankAccount>]) {
for event in events {
let payload = serde_json::to_vec(&event.payload).expect("unable to serialized payload");
let record = KafkaRecord::new()
.with_key(aggregate_id)
.with_payload(&payload);
self.client.send(record).await;
}
}
}
A small follow-up question to this. I adopted this pattern to send the message to SNS, on the receiving end it looks like this:
{
"Type" : "Notification",
"MessageId" : "f5458878-95d1-5aad-82d9-d3d5edf775d9",
"SequenceNumber" : "10000000000000027000",
"TopicArn" : "arn:aws:sns:eu-central-1:xxx:dev-events.fifo",
"Message" : "{\"CustomerDepositedMoney\":{\"amount\":1000.0,\"balance\":9000.0}}",
"Timestamp" : "2022-09-27T22:15:02.834Z",
"UnsubscribeURL" : "https://sns.eu-central-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-central-1:xxx:dev-events.fifo:aab561d9-73b3-4cdf-b527-9cbfa357eb47"
}
I'm using this as a playground for the events:
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum BankAccountEvent {
AccountOpened {
account_id: String,
},
CustomerDepositedMoney {
amount: f64,
balance: f64,
},
CustomerWithdrewCash {
amount: f64,
balance: f64,
},
CustomerWroteCheck {
check_number: String,
amount: f64,
balance: f64,
},
}
But how do I convert the incoming JSON {"CustomerDepositedMoney":{"amount":1000.0,"balance":9000.0}}
back into an event?
To answer my own question: Seems serde can do this out of the box :)
@CumpsD that's correct! I'm leaning on serde to do a lot of the heavy lifting throughout this crate.