Akkling icon indicating copy to clipboard operation
Akkling copied to clipboard

ddata.AsyncUpdate replaces old data with new instead of mergin them

Open vasily-kirichenko opened this issue 7 years ago • 2 comments

I'm trying to use PNCounterDictionary: (node 1)

let cluster = Cluster.Get system
let ddata = DistributedData.Get system
let key = PNCounterDictionaryKey "test"
    
async {
    for n in 1..10_000_000 do
        let counterKey = string (n % 10)
        do! ddata.AsyncUpdate(key, PNCounterDictionary.Empty.Increment(cluster, counterKey), Consistency.writeLocal)  
        do! Async.Sleep 500
} 
|> Async.RunSynchronously

(node 2)

let ddata = DistributedData.Get system
let key: PNCounterDictionaryKey<string> = PNCounterDictionaryKey "test"

props (
    let loop (ctx: Actor<Msg>) =
        function
        | Tick ->
            ActorTaskScheduler.RunTask (fun () ->
                async {
                    let! counters = ddata.AsyncGet(key, Consistency.readLocal)
                    ctx.Self <! GotCounter counters
                }
                |> Async.StartAsTask
                |> fun x -> x :> Task)
            ignored()
        
        | GotCounter counter ->
            logInfof ctx "Counters! %A" counter
            ctx.Schedule (TimeSpan.FromSeconds 1.0) ctx.Self Tick |> ignore
            ignored()
            
    fun (ctx: Actor<Msg>) ->
        ctx.Self <! Tick
        become (loop ctx)
) 
|> spawnAnonymous system
|> ignore

output (node 2):

[INFO][05-Jan-18 10:18:42][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[6, 1]])
[INFO][05-Jan-18 10:18:43][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[6, 1]])
[INFO][05-Jan-18 10:18:44][Thread 0020][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[0, 1]])
[INFO][05-Jan-18 10:18:45][Thread 0005][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[0, 1]])
[INFO][05-Jan-18 10:18:46][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[4, 1]])
[INFO][05-Jan-18 10:18:47][Thread 0005][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[4, 1]])
[INFO][05-Jan-18 10:18:48][Thread 0026][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[8, 1]])
[INFO][05-Jan-18 10:18:49][Thread 0003][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[8, 1]])
[INFO][05-Jan-18 10:18:50][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[2, 1]])

As I understand, the map is replaced in every ddata.AsyncUpdate(key, PNCounterDictionary.Empty.Increment(cluster, counterKey), Consistency.writeLocal) with a new PNCounterDictionary (containing single key). I expect it to be merged with the previous map.

The questions are:

  • am I doing updating right?
  • if I am, is it a bug?

Also, it would be very useful if some ddata usage examples were added in form of docs or example snippets.

C# DData DSL works:

async {
    for n in 1..10_000_000 do
        let counterKey = string (n % 10)
        
        ddata.Replicator.Tell(
            Dsl.Update(
                key, 
                PNCounterDictionary.Empty, 
                Consistency.writeLocal, 
                fun existing -> existing.Increment(cluster, counterKey)))
       
        do! Async.Sleep 500
} 
function
| Tick ->
    ActorTaskScheduler.RunTask (fun () ->
        async {
            let! counters = ddata.AsyncGet(key, Consistency.readLocal)
            ctx.Self <! GotCounter counters
        }
        |> Async.StartAsTask
        |> fun x -> x :> Task)
    ignored()

| GotCounter counters ->
    match counters with
    | Some counters ->
        counters.Entries
        |> Seq.map (fun (KeyValue(k, v)) -> sprintf "%s -> %O" k v)
        |> String.concat "\n"
        |> logInfof ctx "Counters!\n%s"
    | None ->
        logInfof ctx "Got null counters"
        
    ctx.Schedule (TimeSpan.FromSeconds 1.0) ctx.Self Tick |> ignore
    ignored()
[INFO][05-Jan-18 12:51:20][Thread 0003][[akka://TestSystem/user/$a#1434317428]] Counters!
5 -> 65
4 -> 65
7 -> 65
6 -> 65
1 -> 65
0 -> 64
3 -> 65
2 -> 65
9 -> 64
8 -> 64

vasily-kirichenko avatar Jan 07 '18 15:01 vasily-kirichenko

Not 100% sure it's related but found interesting behaviour on the ORSet with lmdb enabled. The AsyncGet (and GetAsync directly - same story) doesn't load the saved state from the db but AsyncGet followed by AsyncUpdate (with returned state) followed by AsyncGet gets everything loaded.

cotyar avatar Jan 08 '18 18:01 cotyar

ORMultiMap has the same behaviour - AsyncGet doesn't read the storage until AsyncUpdate. Do I need to send some kind of a warm-up message to the replicator maybe? Sorry if I'm missing something obvious. Running it all with the latest Akka beta

cotyar avatar Jan 10 '18 21:01 cotyar