rails_event_store
rails_event_store copied to clipboard
Respect subscribers defined order
From @gottfrois on June 20, 2016 11:36
When defining global and per event subscribers, we should respect in what order they are defined.
Today we trigger global subscribers after per event subscribers:
def all_subscribers_for(event_type)
subscribers[event_type] + @global_subscribers
end
https://github.com/arkency/ruby_event_store/blob/master/lib/ruby_event_store/pub_sub/broker.rb#L50
My use case is the following:
- For all events, define a subscriber that will broadcast the event into rabbitmq broker
- For specific events, define subscribers that are basically Sagas (to trigger other events based on conditions)
::RailsEventStore::Client.new.tap do |es|
es.subscribe_to_all_events(::Broadcaster.new)
es.subscribe(::Consumers::Foo.new, [::Events::SomeEvent])
es.subscribe(::Consumers::Bar.new, [::Events::SomeEvent])
end
I expected Broadcaster to be called first but it is call at the end, after Foo and Bar consumers.
Copied from original issue: RailsEventStore/ruby_event_store#22
From @mpraglowski on June 20, 2016 11:42
I would not rely on order of events. I would make the dependencies between handlers more explicit.
From @mpraglowski on June 20, 2016 11:44
BTW could you share implementation of RabbitMQ broker ?
And please check PRs #20 & #21 - this might affect your code soon ;)
From @gottfrois on June 20, 2016 11:54
Thanks for the PRs, dynamic subscription might affect me in deed. Regarding the broker, I have not implemented a different broker than the one defined in ruby event store.
Since I still want to be able to do event subscriptions locally, and simple broadcast in the wild using rabbit. The easiest way was to have a subscribe_all.
The thing is, order is important. In my case, different services (on different hosts) subscribes to those events and one of the latest handler might trigger a new event that needs to happen after the one currently in the loop.
From @gottfrois on November 23, 2016 9:13
For futur references, here is the approach I choosed to handle event/consumer mapping:
module EventHandlers
class Foo < ActiveJob::Base
prepend EventConsumer
consume Events::Something::Happened
def perform(event)
# do something with the event
end
end
end
module EventConsumer
def self.prepended(base)
base.extend ClassMethods
base.class_eval do
prepend ::AsyncYamlDeserializer
end
end
module ClassMethods
def consume(*values)
event_store.subscribe(self, values)
end
def event_store
Rails.application.config.event_store
end
end
end
Note: you'll need to eager load your application to make sure event/consumer mapping is setup
To solve the issue, we could have a global subscription_counter, one integer, stored per event store instance. That counter would be incremented with each subscribed handler.
# Current value of the counter is 0
event_store.subscribe(SomeEvent, to: [SomeHandler])
# We internally store, that {SomeEvent => [ Subscription(SomeHandler,0) ]}, 0 was the value of the counter before incrementing
# Current value of the counter is 1
event_store.subscribe_to_all_events(GlobalHandler)
# We internally store, that global handlers are [ Subscription(GlobalHandler,1) ]}, 1 was the value of the counter before incrementing
# Current value of the counter is 2
event_store.subscribe(SomeEvent, OtherHandler)
# We internally store, that {SomeEvent => [ Subscription(SomeHandler,0), Subscription(OtherHandler, 2) ]}, 2 was the value of the counter before incrementing
# Now when executing handlers for SomeEvent we have two list to concatenate: [ Subscription(SomeHandler, 0), Subscription(OtherHandler, 2) ] and [ Subscription(GlobalHandler, 1) ]
# We now only sort by that integer, and we have a list of handlers in order of subscription
Possible issues:
- the counter is one, and it lives in long running process. One could think whether it could be problematic that this integer will be too big after some time, but I think we can reasonably assume that with ruby bigint implementation it won't be problematic, even for processes running for years.
- one should decide what to do, when we subscribe the same handler, twice. Whether we should take the current value of the counter, or the old one. I think we could start with leaving the old one, but I don't have strong opinion on that