crdt-examples
crdt-examples copied to clipboard
Pure-op based CRDT, ORSet: 'Remove' operations are being pruned before being applied
The following test (which should take place in Crdt.Tests/commutative/pure/ORSetTests.fs
) fails because the ORSet.remove 1 a
operation is pruned from the PO-Log (state.Unstable
) before it is applied. Hence, the 1
value is never removed from the state.
test "What happens when a new op obsolete a remove op before it is applied" {
use sys = System.create "sys" <| Configuration.parse "akka.loglevel = DEBUG"
let a = spawn sys "A" <| props (ORSet.props "A")
let b = spawn sys "B" <| props (ORSet.props "B")
a <! Connect("B", b)
b <! Connect("A", a)
let state = ORSet.add 1 a |> wait
let state = ORSet.add 2 b |> wait
Thread.Sleep 500
let state = ORSet.remove 1 a |> wait
Expect.equal state (Set.singleton 2) "A returns set without removed value"
let state = ORSet.add 3 b |> wait
Thread.Sleep 500
let s1 = ORSet.query a |> wait
let s2 = ORSet.query b |> wait
Expect.equal s1 s2 "After replication both sides should have the same state"
Expect.equal s1 (Set.ofList [2;3]) "On concurrent conflicting update, add wins"
}
I added some logs to make it clearly visible when the operation is removed:
It seems to come from the obsoletes
method implementation in the OR-Set (Crdt/commutative/pure/ORSet.fs
):
member this.Obsoletes(o, n) =
match n.Value, o.Value with
| Add v2, Add v1 when Version.compare n.Version o.Version = Ord.Lt -> v1 = v2 // add v2 is obsolete when its lower than another add
| Add v2, Remove v1 when Version.compare n.Version o.Version = Ord.Lt -> v1 = v2 // add v2 is obsolete when its lower than another remove
| Remove _, _ -> true // since o can be only lower or concurrent, it's always losing (add-wins)
| _ -> false
Also, PO-Log's event pruning is done before the operations are applied (Crdt/commutative/pure/Replication.fs
):
for op in actual do
let observed = state.Observed |> MTime.update nodeId op.Version
// check if new incoming event is not obsolete
let obsolete = state.Unstable |> Set.exists(fun o -> crdt.Obsoletes(o, op))
// HERE --> prune unstable operations, which have been obsoleted by incoming event
let pruned = state.Unstable |> Set.filter (fun o -> not (crdt.Obsoletes(op, o)))
state <- { state with
Observed = observed
Unstable = if obsolete then pruned else Set.add op pruned
LatestVersion = Version.merge op.Version state.LatestVersion }
// check if some of unstable events have stabilized now
let stableOps, unstableOps, stableVersion = stabilize state
let state =
if not (Set.isEmpty stableOps) then
logDebugf ctx "stabilized state %A over %O (stable: %O, unstable: %O)" state.Stable stableVersion stableOps unstableOps
// HERE --> operations are applied after pruning
let stable = crdt.Apply(state.Stable, stableOps)
{ state with Stable = stable; Unstable = unstableOps; StableVersion = stableVersion }
else
state