Akkling
Akkling copied to clipboard
ddata.AsyncUpdate replaces old data with new instead of mergin them
I'm trying to use PNCounterDictionary
:
(node 1)
let cluster = Cluster.Get system
let ddata = DistributedData.Get system
let key = PNCounterDictionaryKey "test"
async {
for n in 1..10_000_000 do
let counterKey = string (n % 10)
do! ddata.AsyncUpdate(key, PNCounterDictionary.Empty.Increment(cluster, counterKey), Consistency.writeLocal)
do! Async.Sleep 500
}
|> Async.RunSynchronously
(node 2)
let ddata = DistributedData.Get system
let key: PNCounterDictionaryKey<string> = PNCounterDictionaryKey "test"
props (
let loop (ctx: Actor<Msg>) =
function
| Tick ->
ActorTaskScheduler.RunTask (fun () ->
async {
let! counters = ddata.AsyncGet(key, Consistency.readLocal)
ctx.Self <! GotCounter counters
}
|> Async.StartAsTask
|> fun x -> x :> Task)
ignored()
| GotCounter counter ->
logInfof ctx "Counters! %A" counter
ctx.Schedule (TimeSpan.FromSeconds 1.0) ctx.Self Tick |> ignore
ignored()
fun (ctx: Actor<Msg>) ->
ctx.Self <! Tick
become (loop ctx)
)
|> spawnAnonymous system
|> ignore
output (node 2):
[INFO][05-Jan-18 10:18:42][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[6, 1]])
[INFO][05-Jan-18 10:18:43][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[6, 1]])
[INFO][05-Jan-18 10:18:44][Thread 0020][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[0, 1]])
[INFO][05-Jan-18 10:18:45][Thread 0005][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[0, 1]])
[INFO][05-Jan-18 10:18:46][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[4, 1]])
[INFO][05-Jan-18 10:18:47][Thread 0005][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[4, 1]])
[INFO][05-Jan-18 10:18:48][Thread 0026][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[8, 1]])
[INFO][05-Jan-18 10:18:49][Thread 0003][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[8, 1]])
[INFO][05-Jan-18 10:18:50][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[2, 1]])
As I understand, the map is replaced in every ddata.AsyncUpdate(key, PNCounterDictionary.Empty.Increment(cluster, counterKey), Consistency.writeLocal)
with a new PNCounterDictionary
(containing single key). I expect it to be merged with the previous map.
The questions are:
- am I doing updating right?
- if I am, is it a bug?
Also, it would be very useful if some ddata usage examples were added in form of docs or example snippets.
C# DData DSL works:
async {
for n in 1..10_000_000 do
let counterKey = string (n % 10)
ddata.Replicator.Tell(
Dsl.Update(
key,
PNCounterDictionary.Empty,
Consistency.writeLocal,
fun existing -> existing.Increment(cluster, counterKey)))
do! Async.Sleep 500
}
function
| Tick ->
ActorTaskScheduler.RunTask (fun () ->
async {
let! counters = ddata.AsyncGet(key, Consistency.readLocal)
ctx.Self <! GotCounter counters
}
|> Async.StartAsTask
|> fun x -> x :> Task)
ignored()
| GotCounter counters ->
match counters with
| Some counters ->
counters.Entries
|> Seq.map (fun (KeyValue(k, v)) -> sprintf "%s -> %O" k v)
|> String.concat "\n"
|> logInfof ctx "Counters!\n%s"
| None ->
logInfof ctx "Got null counters"
ctx.Schedule (TimeSpan.FromSeconds 1.0) ctx.Self Tick |> ignore
ignored()
[INFO][05-Jan-18 12:51:20][Thread 0003][[akka://TestSystem/user/$a#1434317428]] Counters!
5 -> 65
4 -> 65
7 -> 65
6 -> 65
1 -> 65
0 -> 64
3 -> 65
2 -> 65
9 -> 64
8 -> 64
Not 100% sure it's related but found interesting behaviour on the ORSet with lmdb enabled. The AsyncGet (and GetAsync directly - same story) doesn't load the saved state from the db but AsyncGet followed by AsyncUpdate (with returned state) followed by AsyncGet gets everything loaded.
ORMultiMap has the same behaviour - AsyncGet doesn't read the storage until AsyncUpdate. Do I need to send some kind of a warm-up message to the replicator maybe? Sorry if I'm missing something obvious. Running it all with the latest Akka beta