rails_event_store icon indicating copy to clipboard operation
rails_event_store copied to clipboard

Reusing aggregate instances inside circular sync handlers leads to WrongExpectedEventVersion error

Open andrzejkrzywda opened this issue 7 years ago • 6 comments

Probably the code will tell it clearier than I can explain ;)

In short - we're having two aggregates. There's a circular dependency between them (via events). Once we started caching (here simulated with $global_var) the first aggregate we noticed a WrongExpectedEventVersion exception.

The fix was to set the version and unpublished_events manually.

class AggregateRootBugTest < ActiveSupport::TestCase

  def test_versioning_broken
    event_store = RailsEventStore::Client.new(
      repository: RubyEventStore::InMemoryRepository.new,
    )
    $aggregate_1 = Aggregate_1.new
    $aggregate_1.load("stream_1", event_store: event_store)

    event_store.subscribe(
      -> _ {
        aggregate = Aggregate_2.new
        aggregate.load("stream_2", event_store: event_store)
        aggregate.publish_event_2
        aggregate.store("stream_2", event_store: event_store)
      },
      to: [Event_1]
    )

    event_store.subscribe(
      -> _ {
        # uncommenting the code should fix the bug (and fail the test)
        # $aggregate_1.instance_variable_set(:@version, ($aggregate_1.instance_variable_get(:@version) || -1) + $aggregate_1.unpublished_events.size)
        # $aggregate_1.instance_variable_set(:@unpublished_events, [])
        $aggregate_1.do_something_in_reaction_to_event_2
        assert_raise(RubyEventStore::WrongExpectedEventVersion, bug_fixed_message) do
          $aggregate_1.store("stream_1", event_store: event_store)
        end
      },
      to: [Event_2]
    )
    $aggregate_1.publish_event_1

    $aggregate_1.store("stream_1", event_store: event_store)
  end

  def bug_fixed_message
    "If this test fails, it means that AggregateRoot has the bug fixed. "+
    "Go and remove the hack in Foo::Bar (global cache) " +
    "It has something to do with setting ivars for version and events"
  end

  class Aggregate_1
    include AggregateRoot

    def publish_event_1
      apply(Event_1.new(data: {}))
    end

    def do_something_in_reaction_to_event_2
    end

    def apply_event_1(event)
    end

  end


  class Aggregate_2
    include AggregateRoot

    def publish_event_2
      apply(Event_2.new(data: {}))
    end

    def apply_event_2(event)
    end

  end

  class Event_1 < RailsEventStore::Event
  end

  class Event_2 < RailsEventStore::Event
  end
end

andrzejkrzywda avatar Apr 12 '18 09:04 andrzejkrzywda

Point 1: Circular sync handlers is rarely a good idea imho so I am leaning towards won't fix.

paneq avatar Apr 12 '18 11:04 paneq

Point2: I was thinking about more optimistic approach.

Current code:

  def store(stream_name = @loaded_from_stream_name, event_store: default_event_store)
    event_store.publish_events(unpublished_events, stream_name: stream_name, expected_version: version)
    @version += unpublished_events.size
    @unpublished_events = nil
  end

Potential new code (might have bugs, just an idea)

  def store(stream_name = @loaded_from_stream_name, event_store: default_event_store)
    publish_those_events = unpublished_events.to_a
    version = @version
    @version += unpublished_events.size
    @unpublished_events = nil
    event_store.publish_events(publish_those_events, stream_name: stream_name, expected_version: version)
  rescue
    # restore state and bubble up
    @version -= publish_those_events.size
    @unpublished_events = publish_those_events
    raise
  end

however that's not really good enough as the whole state reached by applying events is not longer correct so you can still shoot yourself in the foot if you try to reuse that cached aggregate instance. On any exception the instance should be discarded from cache. To incentivize it we could use freeze and make the instance unusable.

  def store(stream_name = @loaded_from_stream_name, event_store: default_event_store)
    publish_those_events = unpublished_events.to_a
    version = @version
    @version += unpublished_events.size
    @unpublished_events = nil
    event_store.publish_events(publish_those_events, stream_name: stream_name, expected_version: version)
  rescue
    # restore state, freeze and bubble up
    @version -= publish_those_events.size
    @unpublished_events = publish_those_events
    freeze
    raise
  end

This however might be conflicting with #107 #140 ideas which wanted to react to concurrency issues

cc @andrzejkrzywda

paneq avatar Apr 12 '18 11:04 paneq

More thoughts from discussions with @andrzejkrzywda @fidel and @pawelpacana today:

  1. We should not care for now, about aggregate being possibly inconsistent when some error happens, because this inconsistency is possible even without scenario mentioned in the first post:
class Order
  include AggregateRoot

  def some_action
    @something = 42
    raise SomeInvariantViolated if some_invariant_violated?
    apply SomethingHappened.new
  end

  private

  def apply_something_happened(_event)
    # setting some ivars etc.
  end
end

After calling Order.new.some_action we have @something set to 42, but the fact was not published. Depending on the usecase, having that attribute set in "cached" aggregate may be considered a bug or not, but it's not likely we can do something about it without STM.

  1. What we can do, is clear the unpublished_events and bump version in other moment of the flow, for example, just after storing the events in the repository, but before running handlers. That can't be done without modifications in RubyEventStore API.

Rough spike provided one day by @andrzejkrzywda was:

def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  enriched_events = enrich_events_metadata(events)
  serialized_events = serialize_events(enriched_events)
  append_to_stream_serialized_events(serialized_events, stream_name: stream_name, expected_version: expected_version)
  **yield if block_given?**
  enriched_events.zip(serialized_events) do |event, serialized_event|
    with_metadata(
      correlation_id: event.metadata[:correlation_id] || event.event_id,
      causation_id:   event.event_id,
    ) do
      broker.(event, serialized_event)
    end
  end
  self
end

and then in aggregate root:

def store(stream_name = loaded_from_stream_name, event_store: default_event_store)
  event_store.publish(unpublished, stream_name: stream_name, expected_version: version) do
    @version += unpublished_events.size
    @unpublished_events = nil
  end
end

There's still need to decide on API for that:

  • yield, as shown above
  • keyword argument with "callback": def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any, after_repository_store: ->() {}). I don't particularly like such things, but I don't like yields which are not obvious when are they happening either, because of https://blog.arkency.com/a-scary-side-of-activerecords-find/
  • another suggestion?
  1. We've noticed that unpublished_events and version are changed always at the same time. That may suggest some abstraction combining these two values.

swistak35 avatar Jun 25 '19 21:06 swistak35

We should not care for now, about aggregate being possibly inconsistent

As a user you might be interested in handling that inconsistency. It is probably the easiest (at least for an event sourced aggregate) to reload persisted events and source the aggregate state when such error indicating possible corruption occurs.

mostlyobvious avatar Jun 25 '19 21:06 mostlyobvious

I don't like yields which are not obvious when are they happening either,

def foo(&block)
  block.call if block_given?
end

I think I'd still prefer block/yield for consistency.

mostlyobvious avatar Sep 17 '19 17:09 mostlyobvious

https://github.com/ko1/ractor-tvar STM is coming and might help here :)

mostlyobvious avatar Nov 20 '20 12:11 mostlyobvious