akka.net
akka.net copied to clipboard
attempting to fix racy Akka.Persistence.TestKit.Tests
Changes
Trying to fix a pair of specs with high flip rates in the Akka.NET test suite.
Checklist
For significant changes, please ensure that the following have been completed (delete if not relevant):
- [x] This change follows the Akka.NET API Compatibility Guidelines.
- [x] This change follows the Akka.NET Wire Compatibility Guidelines.
- [x] I have reviewed my own pull request.
[INFO][01/19/2024 15:34:01.299Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][01/19/2024 15:34:01.299Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][01/19/2024 15:34:01.438Z][Thread 0012][akka://test/user/counter] Starting up
[INFO][01/19/2024 15:34:06.494Z][Thread 0012][akka://test/user/counter] Received recover Akka.Persistence.RecoveryCompleted
[INFO][01/19/2024 15:34:06.496Z][Thread 0012][akka://test/user/counter] Received command inc
[DEBUG][01/19/2024 15:34:06.500Z][Thread 0037][ActorSystem(test)] System shutdown initiated
Took ~5 seconds to complete recovery with the Akka.Persistence.TestKit journal? Something's off then.
Well this is puzzling, but reflects the problem using logging statements:
[INFO][01/23/2024 22:19:51.557Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][01/23/2024 22:19:51.557Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][01/23/2024 22:19:51.592Z][Thread 0011][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[ERROR][01/23/2024 22:19:55.451Z][Thread 0013][Bug4762FixSpec (akka://test)] Error during execution of WithJournalWrite
Cause: Xunit.Sdk.FailException: Failed: Timeout 00:00:03 while waiting for a message of type Akka.Persistence.RecoveryCompleted
at Xunit.Assert.Fail(String message) in /_/src/xunit.assert/Asserts/FailAsserts.cs:line 34
at Akka.TestKit.Xunit2.XunitAssertions.Fail(String format, Object[] args) in /home/vsts/work/1/s/src/contrib/testkits/Akka.TestKit.Xunit2/XunitAssertions.cs:line 26
at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelopeAsync[T](Nullable`1 timeout, Action`2 assert, String hint, CancellationToken cancellationToken, Boolean shouldLog) in /home/vsts/work/1/s/src/core/Akka.TestKit/TestKitBase_Expect.cs:line 436
at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelopeAsync[T](Nullable`1 timeout, Action`1 msgAssert, Action`1 senderAssert, String hint, CancellationToken cancellationToken) in /home/vsts/work/1/s/src/core/Akka.TestKit/TestKitBase_Expect.cs:line 411
at Akka.TestKit.TestKitBase.InternalExpectMsgAsync[T](Nullable`1 timeout, Action`1 msgAssert, Action`1 senderAssert, String hint, CancellationToken cancellationToken) in /home/vsts/work/1/s/src/core/Akka.TestKit/TestKitBase_Expect.cs:line 379
at Akka.Persistence.TestKit.Tests.Bug4762FixSpec.<>c__DisplayClass5_0.<<TestJournal_PersistAll_should_only_count_each_event_exceptions_once>b__1>d.MoveNext() in /home/vsts/work/1/s/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs:line 93
--- End of stack trace from previous location ---
at Akka.Persistence.TestKit.PersistenceTestKit.WithJournalWrite(Func`2 behaviorSelector, Func`1 execution) in /home/vsts/work/1/s/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs:line 143
[INFO][01/23/2024 22:19:55.452Z][Thread 0013][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[DEBUG][01/23/2024 22:19:56.467Z][Thread 0035][ActorSystem(test)] System shutdown initiated
[INFO][01/23/2024 22:19:56.476Z][Thread 0035][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][01/23/2024 22:19:56.476Z][Thread 0035][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop
When that same test passes, the logs look like this:
[INFO][01/24/2024 00:02:36.465Z][Thread 0010][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][01/24/2024 00:02:36.484Z][Thread 0010][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][01/24/2024 00:02:36.536Z][Thread 0031][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[INFO][01/24/2024 00:02:36.593Z][Thread 0032][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][01/24/2024 00:02:36.594Z][Thread 0032][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop
[INFO][01/24/2024 00:02:36.638Z][Thread 0034][akka://test/user/$b] Received recover Akka.Persistence.RecoveryCompleted
[INFO][01/24/2024 00:02:36.665Z][Thread 0034][akka://test/user/$b] Received command Akka.Persistence.TestKit.Tests.Bug4762FixSpec+WriteMessage
[INFO][01/24/2024 00:02:36.704Z][Thread 0033][akka://test/system/akka.persistence.journal.test] Beginning write intercept of message Persistent<pid: foo, seqNr: 1, deleted: False, manifest: , sender: , payload: Akka.Persistence.TestKit.Tests.Bug4762FixSpec+TestEvent, writerGuid: b53d40b9-788c-491d-b676-9ae7dd3fd287> with interceptor Noop
[INFO][01/24/2024 00:02:36.704Z][Thread 0033][akka://test/system/akka.persistence.journal.test] Completed write intercept of message Persistent<pid: foo, seqNr: 1, deleted: False, manifest: , sender: , payload: Akka.Persistence.TestKit.Tests.Bug4762FixSpec+TestEvent, writerGuid: b53d40b9-788c-491d-b676-9ae7dd3fd287> with interceptor Noop
[INFO][01/24/2024 00:02:36.705Z][Thread 0033][akka://test/system/akka.persistence.journal.test] Beginning write intercept of message Persistent<pid: foo, seqNr: 2, deleted: False, manifest: , sender: , payload: Akka.Persistence.TestKit.Tests.Bug4762FixSpec+TestEvent, writerGuid: b53d40b9-788c-491d-b676-9ae7dd3fd287> with interceptor Noop
[INFO][01/24/2024 00:02:36.705Z][Thread 0033][akka://test/system/akka.persistence.journal.test] Completed write intercept of message Persistent<pid: foo, seqNr: 2, deleted: False, manifest: , sender: , payload: Akka.Persistence.TestKit.Tests.Bug4762FixSpec+TestEvent, writerGuid: b53d40b9-788c-491d-b676-9ae7dd3fd287> with interceptor Noop
[INFO][01/24/2024 00:02:39.722Z][Thread 0030][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[DEBUG][01/24/2024 00:02:39.781Z][Thread 0034][ActorSystem(test)] System shutdown initiated
[DEBUG][01/24/2024 00:02:39.831Z][Thread 0014][EventStream] Shutting down: StandardOutLogger started
[DEBUG][01/24/2024 00:02:39.831Z][Thread 0014][EventStream] All default loggers stopped
Akka.Persistence.TestKit.Tests.Bug4762FixSpec.TestJournal_PersistAll_should_only_count_each_event_exceptions_once is still failing with
[INFO][01/24/2024 02:01:24.467Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][01/24/2024 02:01:24.467Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][01/24/2024 02:01:24.565Z][Thread 0025][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[ERROR][01/24/2024 02:01:27.720Z][Thread 0036][Bug4762FixSpec (akka://test)] Error during execution of WithJournalWrite
Cause: Xunit.Sdk.FailException: Failed: Timeout 00:00:03 while waiting for a message of type Akka.Persistence.RecoveryCompleted
at Xunit.Assert.Fail(String message) in /_/src/xunit.assert/Asserts/FailAsserts.cs:line 34
at Akka.TestKit.Xunit2.XunitAssertions.Fail(String format, Object[] args) in D:\a\1\s\src\contrib\testkits\Akka.TestKit.Xunit2\XunitAssertions.cs:line 26
at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelopeAsync[T](Nullable`1 timeout, Action`2 assert, String hint, CancellationToken cancellationToken, Boolean shouldLog) in D:\a\1\s\src\core\Akka.TestKit\TestKitBase_Expect.cs:line 436
at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelopeAsync[T](Nullable`1 timeout, Action`1 msgAssert, Action`1 senderAssert, String hint, CancellationToken cancellationToken) in D:\a\1\s\src\core\Akka.TestKit\TestKitBase_Expect.cs:line 411
at Akka.TestKit.TestKitBase.InternalExpectMsgAsync[T](Nullable`1 timeout, Action`1 msgAssert, Action`1 senderAssert, String hint, CancellationToken cancellationToken) in D:\a\1\s\src\core\Akka.TestKit\TestKitBase_Expect.cs:line 379
at Akka.Persistence.TestKit.Tests.Bug4762FixSpec.<>c__DisplayClass5_0.<<TestJournal_PersistAll_should_only_count_each_event_exceptions_once>b__1>d.MoveNext() in D:\a\1\s\src\core\Akka.Persistence.TestKit.Tests\Bug4762FixSpec.cs:line 93
--- End of stack trace from previous location ---
at Akka.Persistence.TestKit.PersistenceTestKit.WithJournalWrite(Func`2 behaviorSelector, Func`1 execution) in D:\a\1\s\src\core\Akka.Persistence.TestKit.Xunit2\PersistenceTestKit.cs:line 143
[INFO][01/24/2024 02:01:27.720Z][Thread 0036][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[DEBUG][01/24/2024 02:01:29.754Z][Thread 0037][ActorSystem(test)] System shutdown initiated
[INFO][01/24/2024 02:01:29.770Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][01/24/2024 02:01:29.770Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop
So what is blocking the TestSnapshotStore from running in this case? Why does it only begin recovery as the ActorSystem is shutting down in these racy failure cases?
So what is blocking the
TestSnapshotStorefrom running in this case? Why does it only begin recovery as theActorSystemis shutting down in these racy failure cases?
That is a red herring, something is blocking the persistent actor under test thread right before recovery even started.
These are logs taken from 2 different tests, one is from Bug4762FixSpec and the other is from the CounterActorTests. I deliberately increased the Bug4762FixSpec timeout value to 10 seconds to see if the failure was caused by a lag or a total failure:
Bug4762FixSpec
[INFO][02/01/2024 23:26:52.474Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][02/01/2024 23:26:52.474Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][02/01/2024 23:26:52.585Z][Thread 0025][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[INFO][02/01/2024 23:26:52.598Z][Thread 0008][akka://test/user/$b] Starting up and beginning recovery
[INFO][02/01/2024 23:26:52.599Z][Thread 0008][akka://test/user/$b] Stashing [Akka.Persistence.TestKit.Tests.Bug4762FixSpec+WriteMessage]
[INFO][02/01/2024 23:26:52.603Z][Thread 0008][akka://test/system/recoveryPermitter] RequestRecoveryPermit received: [akka://test/user/$b#1976573591]
[INFO][02/01/2024 23:26:52.603Z][Thread 0008][akka://test/system/recoveryPermitter] Recovery granted: [akka://test/user/$b#1976573591]
[INFO][02/01/2024 23:26:52.603Z][Thread 0008][akka://test/user/$b] Recovery granted
[INFO][02/01/2024 23:26:54.112Z][Thread 0008][akka://test/user/$b] Telling SnapshotStore to load snapshot
[INFO][02/01/2024 23:26:54.112Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshot message received.
[INFO][02/01/2024 23:26:54.113Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Starting LoadSnapshotAsync circuit breaker.
[INFO][02/01/2024 23:26:54.116Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Invoking LoadAsync inside circuit breaker.
[INFO][02/01/2024 23:26:54.117Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 23:26:54.118Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 23:26:54.120Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshotAsync circuit breaker completed.
[DEBUG][02/01/2024 23:26:54.159Z][Thread 0008][akka://test/system/akka.persistence.journal.test/$a] Replay completed: RecoverySuccess<highestSequenceNr: 0>
[INFO][02/01/2024 22:33:44.671Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][02/01/2024 22:33:44.673Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][02/01/2024 22:33:44.679Z][Thread 0012][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[INFO][02/01/2024 22:33:44.736Z][Thread 0012][akka://test/user/$b] Starting up and beginning recovery
[INFO][02/01/2024 22:33:44.736Z][Thread 0012][akka://test/user/$b] Stashing [Akka.Persistence.TestKit.Tests.Bug4762FixSpec+WriteMessage]
[INFO][02/01/2024 22:33:44.758Z][Thread 0005][akka://test/system/recoveryPermitter] RequestRecoveryPermit received: [akka://test/user/$b#1999959018]
[INFO][02/01/2024 22:33:44.758Z][Thread 0005][akka://test/system/recoveryPermitter] Recovery granted: [akka://test/user/$b#1999959018]
[INFO][02/01/2024 22:33:44.758Z][Thread 0005][akka://test/user/$b] Recovery granted
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/user/$b] Telling SnapshotStore to load snapshot
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshot message received.
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] Starting LoadSnapshotAsync circuit breaker.
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] Invoking LoadAsync inside circuit breaker.
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshotAsync circuit breaker completed.
[DEBUG][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.journal.test/$a] Replay completed: RecoverySuccess<highestSequenceNr: 0>
CounterActorTests
[INFO][02/01/2024 23:26:54.209Z][Thread 0015][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][02/01/2024 23:26:54.209Z][Thread 0015][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][02/01/2024 23:26:54.217Z][Thread 0013][akka://test/system/akka.persistence.journal.test] Using write interceptor Failure
[INFO][02/01/2024 23:26:54.220Z][Thread 0013][ActorSystem(test)] Messaging actor
[INFO][02/01/2024 23:26:54.226Z][Thread 0013][akka://test/user/counter] Starting up
[INFO][02/01/2024 23:26:54.226Z][Thread 0013][akka://test/user/counter] Stashing [inc]
[INFO][02/01/2024 23:26:54.226Z][Thread 0013][akka://test/system/recoveryPermitter] RequestRecoveryPermit received: [akka://test/user/counter#459201627]
[INFO][02/01/2024 23:26:54.226Z][Thread 0013][akka://test/system/recoveryPermitter] Recovery granted: [akka://test/user/counter#459201627]
[INFO][02/01/2024 23:26:54.226Z][Thread 0013][akka://test/user/counter] Recovery granted
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/user/counter] Telling SnapshotStore to load snapshot
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshot message received.
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] Starting LoadSnapshotAsync circuit breaker.
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] Invoking LoadAsync inside circuit breaker.
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot test loading using interceptor Noop
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot test loading using interceptor Noop
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshotAsync circuit breaker completed.
[INFO][02/01/2024 23:26:55.131Z][Thread 0013][akka://test/user/counter] Received recover Akka.Persistence.RecoveryCompleted
[INFO][02/01/2024 21:48:16.654Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][02/01/2024 21:48:16.656Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][02/01/2024 21:48:16.661Z][Thread 0032][akka://test/system/akka.persistence.journal.test] Using write interceptor Failure
[INFO][02/01/2024 21:48:16.689Z][Thread 0032][ActorSystem(test)] Messaging actor
[INFO][02/01/2024 21:48:16.698Z][Thread 0007][akka://test/user/counter] Starting up
[INFO][02/01/2024 21:48:16.698Z][Thread 0007][akka://test/user/counter] Stashing [inc]
[INFO][02/01/2024 21:48:16.700Z][Thread 0032][akka://test/system/recoveryPermitter] RequestRecoveryPermit received: [akka://test/user/counter#79486081]
[INFO][02/01/2024 21:48:16.700Z][Thread 0032][akka://test/system/recoveryPermitter] Recovery granted: [akka://test/user/counter#79486081]
[INFO][02/01/2024 21:48:16.700Z][Thread 0032][akka://test/user/counter] Recovery granted
[ERROR][02/01/2024 21:48:21.633Z][Thread 0035][CounterActorTests (akka://test)] Error during execution of WithJournalWrite
Cause: Xunit.Sdk.FailException: Failed: Timeout 00:00:03 while waiting for a message of type Akka.Actor.Terminated Terminated [akka://test/user/counter#79486081].
at Xunit.Assert.Fail(String message) in /_/src/xunit.assert/Asserts/FailAsserts.cs:line 34
<SNIP>
at Xunit.Sdk.ExceptionAggregator.RunAsync(Func`1 code) in /_/src/xunit.core/Sdk/ExceptionAggregator.cs:line 90
[INFO][02/01/2024 21:48:21.634Z][Thread 0035][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[INFO][02/01/2024 21:48:21.654Z][Thread 0032][akka://test/user/counter] Telling SnapshotStore to load snapshot
[INFO][02/01/2024 21:48:21.654Z][Thread 0032][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshot message received.
Observations
- Note the logs timestamp before the "Recovery granted" log message from the test actor
- Note the logs timestamp after the "Telling SnapshotStore to load snapshot" log message
- They are all measured in miliseconds and sub miliseconds, which is what we expect on a properly behaving system
- Note the time it took between the "Recovery granted" and "Telling SnapshotStore to load snapshot" log message
- Notice that there is a noticeable 1-4 seconds lag between these two lines. On a very bad day, this lag can extend to up to 7 seconds
- These logs are taken from a slightly modified "Eventsourced.Lifecycle.cs" file:
- These anomalies only happened on the build server, both me and Aaron could not see them when we're running the tests locally on our machines
Note that there are no async-await between the two log lines, only a simple code.
The ChangeState(RecoveryStarted(recovery.ReplayMax)); code and be simplified even further by rewriting it to _currentState = RecoveryStarted(recovery.ReplayMax);
The RecoveryStarted() method returns a simple POCO class EventsourcedState that held a state, function delegate, and function expression to do message handling. The only computationally heavy code inside the RecoveryStarted() method is the code that retrieves the timeout value from within the HOCON configuration.
Changing the actor dispatcher to "akka.actor.internal-dispatcher" eliminates the start-up delay, could it be that there's something wrong inside "akka.actor.default-dispatcher" implementation?
Changing the actor dispatcher to "akka.actor.internal-dispatcher" eliminates the start-up delay, could it be that there's something wrong inside "akka.actor.default-dispatcher" implementation?
We implemented this in #7117 - let's see how it does
@Arkatufus looks to me like the dispatcher changes didn't have any impact