rails_event_store icon indicating copy to clipboard operation
rails_event_store copied to clipboard

Respect subscribers defined order

Open mostlyobvious opened this issue 8 years ago • 5 comments

From @gottfrois on June 20, 2016 11:36

When defining global and per event subscribers, we should respect in what order they are defined.

Today we trigger global subscribers after per event subscribers:

def all_subscribers_for(event_type)
  subscribers[event_type] + @global_subscribers
end

https://github.com/arkency/ruby_event_store/blob/master/lib/ruby_event_store/pub_sub/broker.rb#L50

My use case is the following:

  • For all events, define a subscriber that will broadcast the event into rabbitmq broker
  • For specific events, define subscribers that are basically Sagas (to trigger other events based on conditions)
::RailsEventStore::Client.new.tap do |es|
  es.subscribe_to_all_events(::Broadcaster.new)
  es.subscribe(::Consumers::Foo.new, [::Events::SomeEvent])
  es.subscribe(::Consumers::Bar.new, [::Events::SomeEvent])
end

I expected Broadcaster to be called first but it is call at the end, after Foo and Bar consumers.

Copied from original issue: RailsEventStore/ruby_event_store#22

mostlyobvious avatar Oct 05 '17 15:10 mostlyobvious

From @mpraglowski on June 20, 2016 11:42

I would not rely on order of events. I would make the dependencies between handlers more explicit.

mostlyobvious avatar Oct 05 '17 15:10 mostlyobvious

From @mpraglowski on June 20, 2016 11:44

BTW could you share implementation of RabbitMQ broker ?

And please check PRs #20 & #21 - this might affect your code soon ;)

mostlyobvious avatar Oct 05 '17 15:10 mostlyobvious

From @gottfrois on June 20, 2016 11:54

Thanks for the PRs, dynamic subscription might affect me in deed. Regarding the broker, I have not implemented a different broker than the one defined in ruby event store.

Since I still want to be able to do event subscriptions locally, and simple broadcast in the wild using rabbit. The easiest way was to have a subscribe_all.

The thing is, order is important. In my case, different services (on different hosts) subscribes to those events and one of the latest handler might trigger a new event that needs to happen after the one currently in the loop.

mostlyobvious avatar Oct 05 '17 15:10 mostlyobvious

From @gottfrois on November 23, 2016 9:13

For futur references, here is the approach I choosed to handle event/consumer mapping:

module EventHandlers
  class Foo < ActiveJob::Base
    prepend EventConsumer

    consume Events::Something::Happened

    def perform(event)
      # do something with the event
    end
  end
end
module EventConsumer
  def self.prepended(base)
    base.extend ClassMethods
    base.class_eval do
      prepend ::AsyncYamlDeserializer
    end
  end

  module ClassMethods
    def consume(*values)
      event_store.subscribe(self, values)
    end

    def event_store
      Rails.application.config.event_store
    end
  end
end

Note: you'll need to eager load your application to make sure event/consumer mapping is setup

mostlyobvious avatar Oct 05 '17 15:10 mostlyobvious

To solve the issue, we could have a global subscription_counter, one integer, stored per event store instance. That counter would be incremented with each subscribed handler.

# Current value of the counter is 0
event_store.subscribe(SomeEvent, to: [SomeHandler])
# We internally store, that {SomeEvent => [ Subscription(SomeHandler,0) ]}, 0 was the value of the counter before incrementing

# Current value of the counter is 1
event_store.subscribe_to_all_events(GlobalHandler)
# We internally store, that global handlers are [ Subscription(GlobalHandler,1) ]}, 1 was the value of the counter before incrementing

# Current value of the counter is 2
event_store.subscribe(SomeEvent, OtherHandler)
# We internally store, that {SomeEvent => [ Subscription(SomeHandler,0), Subscription(OtherHandler, 2) ]}, 2 was the value of the counter before incrementing


# Now when executing handlers for SomeEvent we have two list to concatenate: [ Subscription(SomeHandler, 0), Subscription(OtherHandler, 2) ] and [ Subscription(GlobalHandler, 1) ]
# We now only sort by that integer, and we have a list of handlers in order of subscription

Possible issues:

  • the counter is one, and it lives in long running process. One could think whether it could be problematic that this integer will be too big after some time, but I think we can reasonably assume that with ruby bigint implementation it won't be problematic, even for processes running for years.
  • one should decide what to do, when we subscribe the same handler, twice. Whether we should take the current value of the counter, or the old one. I think we could start with leaving the old one, but I don't have strong opinion on that

swistak35 avatar Jun 25 '19 20:06 swistak35