neos-development-collection
neos-development-collection copied to clipboard
Rework CatchUp mechanism
The CatchUp mechanism was overhauled with #4289 but the current implementation still has some issues:
By default we use Scripts::executeCommandAsync() (using the SubprocessProjectionCatchUpTrigger) to trigger catch ups.
This can lead to many sub requests if there are a lot of commands (e.g. multiple editors creating content simultaneously).
Apart from the eminent performance issue this can bring, those sub requests might be killed if the parent request ends (or due to process limits).
Update: After going back and forth I now suggest:
- Switching to a synchronous catch up mechanism
- That basically means that
ContentRepository::handle()will always be blocking until all projections are up to date - All projections will always be updated even if they don't act on an event
- The
EventPersisterwill simply triggerProjectionInterface::apply()for every new published event- it will acquire a mutex to prevent race conditions if it was called in parallel in a separate process
- This will simplify checkpoint management – projections can now just update their own version and control transactions
- That basically means that
Considerations
- for larger scale a different
ProjectionCatchUpTriggerInterfacecould interact with a background worker (e.g. via socket) but the catch up would still be blocking - we could always add some
ContentRepository::handleAsync()later if we find that we really need it (but my feeling is that this only makes sense when we introduce a proper write side model for constraint checks etc)
Current architecture
sequenceDiagram
actor Client
Client->>CR: handle(cmd)
activate CR
CR->>EventPersister: publishEvents()
activate EventPersister
EventPersister->>EventStore: commit()
activate EventStore
EventStore-->>EventPersister: CommitResult
deactivate EventStore
loop
EventPersister->>CatchUpTrigger: triggerCatchUp
activate CatchUpTrigger
rect rgba(200, 200, 200, .1)
Note right of CatchUpTrigger: separate process
CatchUpTrigger--)SubProcessCatchupCommandController: catchupCommand()
activate SubProcessCatchupCommandController
deactivate CatchUpTrigger
SubProcessCatchupCommandController->>CR Registry: get()
CR Registry-->>SubProcessCatchupCommandController: CR
participant CR2 as CR
SubProcessCatchupCommandController->>CR2: catchUpProjection()
deactivate SubProcessCatchupCommandController
end
end
deactivate EventPersister
EventPersister-->>CR: CommandResult
CR-->>Client: CommandResult
deactivate CR
sequenceDiagram
actor Client
Client->>CR: catchUpProjection()
activate CR
CR->>EventStore: load()
activate EventStore
EventStore-->>CR: EventStream
deactivate EventStore
CR->>CheckpointStorage: acquireLock()
activate CheckpointStorage
CheckpointStorage-->>CR: SequenceNumber
loop
CR->>Projection: apply()
CR->>CheckpointStorage: updateAndReleaseLock()
CR->>CheckpointStorage: acquireLock()
CheckpointStorage-->>CR: SequenceNumber
end
deactivate CheckpointStorage
Suggested architecture
sequenceDiagram
actor Client
Client->>CR: handle(cmd)
activate CR
CR->>EventPersister: publishEvents()
activate EventPersister
EventPersister->>EventStore: commit()
activate EventStore
deactivate EventStore
EventPersister->>SubscriptionEngine: run()
activate SubscriptionEngine
Note over SubscriptionEngine: mark new/removed/retrying subscriptions
SubscriptionEngine->>EventStore: load()
activate EventStore
loop
EventStore-->>SubscriptionEngine: EventStream
SubscriptionEngine->>Subscriber (Projection): apply()
end
deactivate EventStore
deactivate SubscriptionEngine
deactivate EventPersister
The original ideas I wrote down in November 2023 when creating this issue:
mermaid sequenceDiagram actor Client Client->>CR: handle(cmd) activate CR CR->>EventPersister: publishEvents() activate EventPersister EventPersister->>EventStore: commit() activate EventStore EventStore-->>EventPersister: CommitResult deactivate EventStore EventPersister->>EventStore: load() activate EventStore EventStore-->>EventPersister: EventStream deactivate EventStore Note over EventPersister: acquire catch up lock loop EventPersister->>Projection: apply() end Note over EventPersister: release catch up lock deactivate EventPersister
Options
Switch to synchronous
It might be an option to switch to a synchronous catch-up mechanism but that has the great risk that it might lead to code that relies on that immediate consistent behavior – and thus breaks when scaled up to be used asynchronously.
=> Potential optimization for local dev and smaller projects, but dangerous and probably no general solution
Fork process
Instead of creating a fully fledged sub request, we could use PCNTL functions to fork the current process (along the lines of spatie/fork). Apparently this allows the scripts to stay alive even if the parent request was killed – but it works only on the CLI so we would still need at least one coordinating sub request.
=> Potential (optional!) performance improvement, but not a solution on its own
Expose locking state of
CheckpointStorageand prevent needless catch-upsThe CheckpointStorageInterface provides means to acquire an exclusive lock in order to prevent events from being applied multiple times to the same subscriber. If we could expose the locking state, we could skip catch-ups that are already being processed.
Some considerations
How to expose lock state?
To ensure that events are only applied once, we use an exclusive write lock on the
*_checkpointstables (see DoctrineCheckpointStorage. AFAIK it is no possible to determine the lock state though without acquiring it..Maybe we can greatly simplify the locking mechanism by using a kind of 2-phase-commit like Laravel does (see DatabaseLock for example).
"Queue" CatchUps
It won't suffice to skip a catch-up if it's already running because the events might have been read already. Instead the lock should consist of three states:
- not acquired (= no catch-up in progress)
- if this is the state, the catch-up should be invoked
- acquired (= catch-up in progress)
- if this is the state, the catch-up should be queued and invoked as soon as the lock is released
- acquired & queued (= catch-up in progress, and queued for re-run)
- if this is the state, we can safely skip the catch-up for the projection in question because it will be caught up anyways
Compare checkpoint sequence number
In some cases we could compare
CommitResult::highestCommittedSequenceNumberwithCheckpointStorageInterface::getHighestAppliedSequenceNumber()to skip catch-ups if the projection is already up-to-date. But this has to be handled with care because other commits might have been skipped under the assumption, that the catch-up is still queued!
Related: #4388
A sequence diagram of the current flow for
ContentRepository::handle():
and ContentRepository::catchUpProjection() that is triggered in a separate process:
Thanks for the write-up! I didn't think your suggestion through yet, but wanted to leave one thought already. Using the fork-approach might be tricky, as I assume that most hosting providers will now have the pctnl extension enabled. So, if that's a hard requirements, people will have trouble using Neos in at a vanilla hosting provider.
Thanks for the write-up! I didn't think your suggestion through yet, but wanted to leave one thought already. Using the fork-approach might be tricky, as I assume that most hosting providers will now have the pctnl extension enabled. So, if that's a hard requirements, people will have trouble using Neos in at a vanilla hosting provider.
Funnily we do have it :D But yes, this is a separate issue and in the end doesn't really fix much for now, so we can probably think about it later.
From todays Weekly:
- Schnapsidee from @skurfuerst : Invalidate projections until they are explicitly blocked (in a single request)
- we change the default from async => sync (NO SUBPROCESSES anymore in this) 🟢 general agreement, if combined with "Schnapsidee" :-D * for big scale → BG worker. 🟢 general agreement * ReactPHP / Promise / ... ? - 🟡 unsure, @bwaidelich will experiment. * Async case: test via behat. (every night) 🟢 general agreement * specify on which projections we block. 🟢 general agreement
Note: I updated the suggested architecture in the issue description: The idea is to introduce the notion of subscriptions inspired by https://event-sourcing.patchlevel.io/latest/subscription/ with the following concepts:
Subscriber
The actual event handler – in our case basically a closure around ProjectionInterface::apply()
Subscription
The persisted state of a subscriber with
- a unique id
- a status (see below)
- the position in the event stream (i.e.
SequenceNumber) - some more internal details for debugging and error resolving
A subscription is basically a state machine with the states
new=> newly discovered subscriber that wasn't booted yetbooting=> initial catchupactive=> default state, new events will be appliedpaused=> explicitly paused subscription, new events won't be applied (we won't need this)finished=> finished one-time-subscription, new event won't be applied (we won't need this)detached=> subscription without a corresponding subscriber (i.e. code was removed)error=> an error had occurred while applying an event to this subscription
The state transitions are:
But we can probably simplify it to something like:
Subscription Engine
The central authority to manage subscription states and invoke subscribers.
It allows to bootsubscriptions, but the main usage is to run it whenever events where published (and when replaying projections).
The process of the run-call is:
discoverNewSubscriptions- create a new subscription record with statenewfor every subscriber that has no such record yetmarkDetachedSubscriptions- the counter-part: mark subscription recordsdetachedif no corresponding subscriber existsretrySubscriptions- reset subscriptions with stateerrorto the state they had before the error if some "retry-strategy" allows (we can skip this part for now)- load all
activesubscriptions and lock their records (...FOR UPDATE) - find the active subscription with the lowest
position - load events starting from that position
- iterate all active subscriptions (skip if they are already further than the current position)
- apply the event (with
try/catchand put the subscription inerrorstate if that failed)
For reference:
The original implementation: https://github.com/patchlevel/event-sourcing/blob/3.4.x/src/Subscription/Engine/DefaultSubscriptionEngine.php
and a WIP version of that from me: https://github.com/bwaidelich/dcb-library/blob/main/src/Subscription/Engine/SubscriptionEngine.php (IMO a bit easier to read but the DB lock is not nice yet)
With the merge of https://github.com/neos/neos-development-collection/pull/5321 i think we can successfully close this?