crdt-examples icon indicating copy to clipboard operation
crdt-examples copied to clipboard

Pure-op based CRDT, ORSet: 'Remove' operations are being pruned before being applied

Open ekkaiasmith opened this issue 1 year ago • 0 comments

The following test (which should take place in Crdt.Tests/commutative/pure/ORSetTests.fs) fails because the ORSet.remove 1 a operation is pruned from the PO-Log (state.Unstable) before it is applied. Hence, the 1 value is never removed from the state.

test "What happens when a new op obsolete a remove op before it is applied" {
        use sys = System.create "sys" <| Configuration.parse "akka.loglevel = DEBUG"

        let a = spawn sys "A" <| props (ORSet.props "A")
        let b = spawn sys "B" <| props (ORSet.props "B")
        a <! Connect("B", b)
        b <! Connect("A", a)

        let state = ORSet.add 1 a |> wait
        let state = ORSet.add 2 b |> wait

        Thread.Sleep 500

        let state = ORSet.remove 1 a |> wait
        Expect.equal state (Set.singleton 2) "A returns set without removed value"
        let state = ORSet.add 3 b |> wait
        
        Thread.Sleep 500
        
        let s1 = ORSet.query a |> wait
        let s2 = ORSet.query b |> wait
        
        Expect.equal s1 s2 "After replication both sides should have the same state"
        Expect.equal s1 (Set.ofList [2;3]) "On concurrent conflicting update, add wins"
}

I added some logs to make it clearly visible when the operation is removed:

Capture d’écran 2024-01-31 à 15 26 26 Capture d’écran 2024-01-31 à 15 27 48

It seems to come from the obsoletes method implementation in the OR-Set (Crdt/commutative/pure/ORSet.fs):

 member this.Obsoletes(o, n) =
            match n.Value, o.Value with
            | Add v2, Add v1 when Version.compare n.Version o.Version = Ord.Lt -> v1 = v2    // add v2 is obsolete when its lower than another add
            | Add v2, Remove v1 when Version.compare n.Version o.Version = Ord.Lt -> v1 = v2 // add v2 is obsolete when its lower than another remove
            | Remove _, _ -> true // since o can be only lower or concurrent, it's always losing (add-wins)
            | _ -> false

Also, PO-Log's event pruning is done before the operations are applied (Crdt/commutative/pure/Replication.fs):

for op in actual do
          let observed = state.Observed |> MTime.update nodeId op.Version
          // check if new incoming event is not obsolete
          let obsolete = state.Unstable |> Set.exists(fun o -> crdt.Obsoletes(o, op))

          // HERE --> prune unstable operations, which have been obsoleted by incoming event
          let pruned = state.Unstable |> Set.filter (fun o -> not (crdt.Obsoletes(op, o)))

          state <- { state with
                       Observed = observed
                       Unstable = if obsolete then pruned else Set.add op pruned
                       LatestVersion = Version.merge op.Version state.LatestVersion }
        
        // check if some of unstable events have stabilized now  
        let stableOps, unstableOps, stableVersion = stabilize state
        let state =
          if not (Set.isEmpty stableOps) then
            logDebugf ctx "stabilized state %A over %O (stable: %O, unstable: %O)" state.Stable stableVersion stableOps unstableOps
            // HERE --> operations are applied after pruning
            let stable = crdt.Apply(state.Stable, stableOps)
            { state with Stable = stable; Unstable = unstableOps; StableVersion = stableVersion }
          else
            state

ekkaiasmith avatar Jan 31 '24 15:01 ekkaiasmith