neos-development-collection icon indicating copy to clipboard operation
neos-development-collection copied to clipboard

Rework CatchUp mechanism

Open bwaidelich opened this issue 2 years ago • 5 comments

The CatchUp mechanism was overhauled with #4289 but the current implementation still has some issues:

By default we use Scripts::executeCommandAsync() (using the SubprocessProjectionCatchUpTrigger) to trigger catch ups. This can lead to many sub requests if there are a lot of commands (e.g. multiple editors creating content simultaneously).

Apart from the eminent performance issue this can bring, those sub requests might be killed if the parent request ends (or due to process limits).

Update: After going back and forth I now suggest:

  • Switching to a synchronous catch up mechanism
    • That basically means that ContentRepository::handle() will always be blocking until all projections are up to date
    • All projections will always be updated even if they don't act on an event
    • The EventPersister will simply trigger ProjectionInterface::apply() for every new published event
      • it will acquire a mutex to prevent race conditions if it was called in parallel in a separate process
    • This will simplify checkpoint management – projections can now just update their own version and control transactions

Considerations

  • for larger scale a different ProjectionCatchUpTriggerInterface could interact with a background worker (e.g. via socket) but the catch up would still be blocking
  • we could always add some ContentRepository::handleAsync() later if we find that we really need it (but my feeling is that this only makes sense when we introduce a proper write side model for constraint checks etc)

Current architecture

sequenceDiagram
    actor Client
    Client->>CR: handle(cmd)
    activate CR
    CR->>EventPersister: publishEvents()
    activate EventPersister
    EventPersister->>EventStore: commit()
    activate EventStore
    EventStore-->>EventPersister: CommitResult
    deactivate EventStore
    loop
      EventPersister->>CatchUpTrigger: triggerCatchUp
      activate CatchUpTrigger
      rect rgba(200, 200, 200, .1)
        Note right of CatchUpTrigger: separate process
        CatchUpTrigger--)SubProcessCatchupCommandController: catchupCommand()
        activate SubProcessCatchupCommandController
        deactivate CatchUpTrigger
        SubProcessCatchupCommandController->>CR Registry: get()
        CR Registry-->>SubProcessCatchupCommandController: CR
        participant CR2 as CR
        SubProcessCatchupCommandController->>CR2: catchUpProjection()
        deactivate SubProcessCatchupCommandController
      end
    end
    deactivate EventPersister
    EventPersister-->>CR: CommandResult
    CR-->>Client: CommandResult
    deactivate CR
sequenceDiagram
    actor Client
    Client->>CR: catchUpProjection()
    activate CR
    CR->>EventStore: load()
    activate EventStore
    EventStore-->>CR: EventStream
    deactivate EventStore
    CR->>CheckpointStorage: acquireLock()
    activate CheckpointStorage
    CheckpointStorage-->>CR: SequenceNumber
    loop
      CR->>Projection: apply()
      CR->>CheckpointStorage: updateAndReleaseLock()
      CR->>CheckpointStorage: acquireLock()
      CheckpointStorage-->>CR: SequenceNumber
    end
    deactivate CheckpointStorage

Suggested architecture

sequenceDiagram
    actor Client
    Client->>CR: handle(cmd)
    activate CR
    CR->>EventPersister: publishEvents()
    activate EventPersister
    EventPersister->>EventStore: commit()
    activate EventStore
    deactivate EventStore
    EventPersister->>SubscriptionEngine: run()
    activate SubscriptionEngine
    Note over SubscriptionEngine: mark new/removed/retrying subscriptions
    SubscriptionEngine->>EventStore: load()
    activate EventStore
    loop
      EventStore-->>SubscriptionEngine: EventStream
      SubscriptionEngine->>Subscriber (Projection): apply()
    end
    deactivate EventStore
    deactivate SubscriptionEngine
    deactivate EventPersister

The original ideas I wrote down in November 2023 when creating this issue:

mermaid sequenceDiagram actor Client Client->>CR: handle(cmd) activate CR CR->>EventPersister: publishEvents() activate EventPersister EventPersister->>EventStore: commit() activate EventStore EventStore-->>EventPersister: CommitResult deactivate EventStore EventPersister->>EventStore: load() activate EventStore EventStore-->>EventPersister: EventStream deactivate EventStore Note over EventPersister: acquire catch up lock loop EventPersister->>Projection: apply() end Note over EventPersister: release catch up lock deactivate EventPersister

Options

Switch to synchronous

It might be an option to switch to a synchronous catch-up mechanism but that has the great risk that it might lead to code that relies on that immediate consistent behavior – and thus breaks when scaled up to be used asynchronously.

=> Potential optimization for local dev and smaller projects, but dangerous and probably no general solution

Fork process

Instead of creating a fully fledged sub request, we could use PCNTL functions to fork the current process (along the lines of spatie/fork). Apparently this allows the scripts to stay alive even if the parent request was killed – but it works only on the CLI so we would still need at least one coordinating sub request.

=> Potential (optional!) performance improvement, but not a solution on its own

Expose locking state of CheckpointStorage and prevent needless catch-ups

The CheckpointStorageInterface provides means to acquire an exclusive lock in order to prevent events from being applied multiple times to the same subscriber. If we could expose the locking state, we could skip catch-ups that are already being processed.

Some considerations

How to expose lock state?

To ensure that events are only applied once, we use an exclusive write lock on the *_checkpoints tables (see DoctrineCheckpointStorage. AFAIK it is no possible to determine the lock state though without acquiring it..

Maybe we can greatly simplify the locking mechanism by using a kind of 2-phase-commit like Laravel does (see DatabaseLock for example).

"Queue" CatchUps

It won't suffice to skip a catch-up if it's already running because the events might have been read already. Instead the lock should consist of three states:

  • not acquired (= no catch-up in progress)
    • if this is the state, the catch-up should be invoked
  • acquired (= catch-up in progress)
    • if this is the state, the catch-up should be queued and invoked as soon as the lock is released
  • acquired & queued (= catch-up in progress, and queued for re-run)
    • if this is the state, we can safely skip the catch-up for the projection in question because it will be caught up anyways

Compare checkpoint sequence number

In some cases we could compare CommitResult::highestCommittedSequenceNumber with CheckpointStorageInterface::getHighestAppliedSequenceNumber() to skip catch-ups if the projection is already up-to-date. But this has to be handled with care because other commits might have been skipped under the assumption, that the catch-up is still queued!

Related: #4388

bwaidelich avatar Nov 13 '23 17:11 bwaidelich

A sequence diagram of the current flow for

ContentRepository::handle():

ESCR_catchup_01

and ContentRepository::catchUpProjection() that is triggered in a separate process:

ESCR_catchup_02

bwaidelich avatar Nov 13 '23 17:11 bwaidelich

Thanks for the write-up! I didn't think your suggestion through yet, but wanted to leave one thought already. Using the fork-approach might be tricky, as I assume that most hosting providers will now have the pctnl extension enabled. So, if that's a hard requirements, people will have trouble using Neos in at a vanilla hosting provider.

robertlemke avatar Nov 14 '23 08:11 robertlemke

Thanks for the write-up! I didn't think your suggestion through yet, but wanted to leave one thought already. Using the fork-approach might be tricky, as I assume that most hosting providers will now have the pctnl extension enabled. So, if that's a hard requirements, people will have trouble using Neos in at a vanilla hosting provider.

Funnily we do have it :D But yes, this is a separate issue and in the end doesn't really fix much for now, so we can probably think about it later.

kitsunet avatar Nov 14 '23 10:11 kitsunet

From todays Weekly:

  • Schnapsidee from @skurfuerst : Invalidate projections until they are explicitly blocked (in a single request)
  • we change the default from async => sync (NO SUBPROCESSES anymore in this) 🟢 general agreement, if combined with "Schnapsidee" :-D * for big scale → BG worker. 🟢 general agreement * ReactPHP / Promise / ... ? - 🟡 unsure, @bwaidelich will experiment. * Async case: test via behat. (every night) 🟢 general agreement * specify on which projections we block. 🟢 general agreement

bwaidelich avatar Feb 23 '24 12:02 bwaidelich

Note: I updated the suggested architecture in the issue description: The idea is to introduce the notion of subscriptions inspired by https://event-sourcing.patchlevel.io/latest/subscription/ with the following concepts:

Subscriber

The actual event handler – in our case basically a closure around ProjectionInterface::apply()

Subscription

The persisted state of a subscriber with

  • a unique id
  • a status (see below)
  • the position in the event stream (i.e. SequenceNumber)
  • some more internal details for debugging and error resolving

A subscription is basically a state machine with the states

  • new => newly discovered subscriber that wasn't booted yet
  • booting => initial catchup
  • active => default state, new events will be applied
  • paused => explicitly paused subscription, new events won't be applied (we won't need this)
  • finished => finished one-time-subscription, new event won't be applied (we won't need this)
  • detached => subscription without a corresponding subscriber (i.e. code was removed)
  • error => an error had occurred while applying an event to this subscription

The state transitions are: image

But we can probably simplify it to something like: image

Subscription Engine

The central authority to manage subscription states and invoke subscribers. It allows to bootsubscriptions, but the main usage is to run it whenever events where published (and when replaying projections). The process of the run-call is:

  1. discoverNewSubscriptions - create a new subscription record with state new for every subscriber that has no such record yet
  2. markDetachedSubscriptions - the counter-part: mark subscription records detached if no corresponding subscriber exists
  3. retrySubscriptions - reset subscriptions with state error to the state they had before the error if some "retry-strategy" allows (we can skip this part for now)
  4. load all active subscriptions and lock their records (...FOR UPDATE)
  5. find the active subscription with the lowest position
  6. load events starting from that position
  7. iterate all active subscriptions (skip if they are already further than the current position)
  8. apply the event (with try/catch and put the subscription in error state if that failed)

For reference:

The original implementation: https://github.com/patchlevel/event-sourcing/blob/3.4.x/src/Subscription/Engine/DefaultSubscriptionEngine.php

and a WIP version of that from me: https://github.com/bwaidelich/dcb-library/blob/main/src/Subscription/Engine/SubscriptionEngine.php (IMO a bit easier to read but the DB lock is not nice yet)

bwaidelich avatar Oct 19 '24 16:10 bwaidelich

With the merge of https://github.com/neos/neos-development-collection/pull/5321 i think we can successfully close this?

mhsdesign avatar Jan 20 '25 10:01 mhsdesign