rails_event_store
rails_event_store copied to clipboard
Reusing aggregate instances inside circular sync handlers leads to WrongExpectedEventVersion error
Probably the code will tell it clearier than I can explain ;)
In short - we're having two aggregates. There's a circular dependency between them (via events).
Once we started caching (here simulated with $global_var) the first aggregate we noticed a WrongExpectedEventVersion exception.
The fix was to set the version and unpublished_events manually.
class AggregateRootBugTest < ActiveSupport::TestCase
def test_versioning_broken
event_store = RailsEventStore::Client.new(
repository: RubyEventStore::InMemoryRepository.new,
)
$aggregate_1 = Aggregate_1.new
$aggregate_1.load("stream_1", event_store: event_store)
event_store.subscribe(
-> _ {
aggregate = Aggregate_2.new
aggregate.load("stream_2", event_store: event_store)
aggregate.publish_event_2
aggregate.store("stream_2", event_store: event_store)
},
to: [Event_1]
)
event_store.subscribe(
-> _ {
# uncommenting the code should fix the bug (and fail the test)
# $aggregate_1.instance_variable_set(:@version, ($aggregate_1.instance_variable_get(:@version) || -1) + $aggregate_1.unpublished_events.size)
# $aggregate_1.instance_variable_set(:@unpublished_events, [])
$aggregate_1.do_something_in_reaction_to_event_2
assert_raise(RubyEventStore::WrongExpectedEventVersion, bug_fixed_message) do
$aggregate_1.store("stream_1", event_store: event_store)
end
},
to: [Event_2]
)
$aggregate_1.publish_event_1
$aggregate_1.store("stream_1", event_store: event_store)
end
def bug_fixed_message
"If this test fails, it means that AggregateRoot has the bug fixed. "+
"Go and remove the hack in Foo::Bar (global cache) " +
"It has something to do with setting ivars for version and events"
end
class Aggregate_1
include AggregateRoot
def publish_event_1
apply(Event_1.new(data: {}))
end
def do_something_in_reaction_to_event_2
end
def apply_event_1(event)
end
end
class Aggregate_2
include AggregateRoot
def publish_event_2
apply(Event_2.new(data: {}))
end
def apply_event_2(event)
end
end
class Event_1 < RailsEventStore::Event
end
class Event_2 < RailsEventStore::Event
end
end
Point 1: Circular sync handlers is rarely a good idea imho so I am leaning towards won't fix.
Point2: I was thinking about more optimistic approach.
Current code:
def store(stream_name = @loaded_from_stream_name, event_store: default_event_store)
event_store.publish_events(unpublished_events, stream_name: stream_name, expected_version: version)
@version += unpublished_events.size
@unpublished_events = nil
end
Potential new code (might have bugs, just an idea)
def store(stream_name = @loaded_from_stream_name, event_store: default_event_store)
publish_those_events = unpublished_events.to_a
version = @version
@version += unpublished_events.size
@unpublished_events = nil
event_store.publish_events(publish_those_events, stream_name: stream_name, expected_version: version)
rescue
# restore state and bubble up
@version -= publish_those_events.size
@unpublished_events = publish_those_events
raise
end
however that's not really good enough as the whole state reached by applying events is not longer correct so you can still shoot yourself in the foot if you try to reuse that cached aggregate instance. On any exception the instance should be discarded from cache. To incentivize it we could use freeze and make the instance unusable.
def store(stream_name = @loaded_from_stream_name, event_store: default_event_store)
publish_those_events = unpublished_events.to_a
version = @version
@version += unpublished_events.size
@unpublished_events = nil
event_store.publish_events(publish_those_events, stream_name: stream_name, expected_version: version)
rescue
# restore state, freeze and bubble up
@version -= publish_those_events.size
@unpublished_events = publish_those_events
freeze
raise
end
This however might be conflicting with #107 #140 ideas which wanted to react to concurrency issues

cc @andrzejkrzywda
More thoughts from discussions with @andrzejkrzywda @fidel and @pawelpacana today:
- We should not care for now, about aggregate being possibly inconsistent when some error happens, because this inconsistency is possible even without scenario mentioned in the first post:
class Order
include AggregateRoot
def some_action
@something = 42
raise SomeInvariantViolated if some_invariant_violated?
apply SomethingHappened.new
end
private
def apply_something_happened(_event)
# setting some ivars etc.
end
end
After calling Order.new.some_action we have @something set to 42, but the fact was not published. Depending on the usecase, having that attribute set in "cached" aggregate may be considered a bug or not, but it's not likely we can do something about it without STM.
- What we can do, is clear the
unpublished_eventsand bumpversionin other moment of the flow, for example, just after storing the events in the repository, but before running handlers. That can't be done without modifications in RubyEventStore API.
Rough spike provided one day by @andrzejkrzywda was:
def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
enriched_events = enrich_events_metadata(events)
serialized_events = serialize_events(enriched_events)
append_to_stream_serialized_events(serialized_events, stream_name: stream_name, expected_version: expected_version)
**yield if block_given?**
enriched_events.zip(serialized_events) do |event, serialized_event|
with_metadata(
correlation_id: event.metadata[:correlation_id] || event.event_id,
causation_id: event.event_id,
) do
broker.(event, serialized_event)
end
end
self
end
and then in aggregate root:
def store(stream_name = loaded_from_stream_name, event_store: default_event_store)
event_store.publish(unpublished, stream_name: stream_name, expected_version: version) do
@version += unpublished_events.size
@unpublished_events = nil
end
end
There's still need to decide on API for that:
yield, as shown above- keyword argument with "callback":
def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any, after_repository_store: ->() {}). I don't particularly like such things, but I don't like yields which are not obvious when are they happening either, because of https://blog.arkency.com/a-scary-side-of-activerecords-find/ - another suggestion?
- We've noticed that
unpublished_eventsandversionare changed always at the same time. That may suggest some abstraction combining these two values.
We should not care for now, about aggregate being possibly inconsistent
As a user you might be interested in handling that inconsistency. It is probably the easiest (at least for an event sourced aggregate) to reload persisted events and source the aggregate state when such error indicating possible corruption occurs.
I don't like yields which are not obvious when are they happening either,
def foo(&block)
block.call if block_given?
end
I think I'd still prefer block/yield for consistency.
https://github.com/ko1/ractor-tvar STM is coming and might help here :)