antidote icon indicating copy to clipboard operation
antidote copied to clipboard

Improve Materialiser cache and its GC mechanism

Open aletomsic opened this issue 9 years ago • 7 comments

Current Implementation:

The are two per-key caches: an operation cache where the most recent operations are stored, and a snapshot cache which stores the latest materialised views (snapshots) of a CRDT. When the materialiser receives an operation, it stores in its operation cache. When it receives a read request, it checks whether there are suitable materialized views available in the snapshot cache and whether there are operations to be applied in the operation cache. If so, it materializes a new version of the CRDT that satisfies the request by applying the potentially missing operations to the corresponding snapshot, thus creating a new snapshot. In the event that cached snapshots or operations exist for a requested key, but none satisfying the read request parameters (i.e. snapshot time), the materialiser replies with an {error, no_snapshot} message.

GC As there are object views (snapshots) and operations, the growing number of any of them needs to trigger the GC mechanism.

  1. Snapshot (or read-triggered) GC: In case the number of snapshots stored for a key grows up to a value “snapshot_threshold”, keep just the latest “snapshot_min” snapshots. Proposed initial values (for no particular reason) are 10 and 2 respectively. Note that the snapshot generation is done on reads, that’s why this mechanism is triggered only on reads.
  2. Operation (or write triggered) GC: In case the number of update operations stored for a key grows up to a number “ops_threshold”, keep just the latest “ops_min” operations by creating a snapshot of the object applying and then removing the oldest ops_threshold-ops_min ops. Proposed an initial values (for no particular reason are 50 and 5 respectively). Note that the snapshot generation could possibly trigger the Snapshot GC mechanism.

Please contribute to the following with your ideas:

Future Caching The idea is to generate some policies to define how each key will be cached, depending on its type or maybe its size. It might be even possible to change policy dynamically, e.g., according to an element’s size. The goal is to achieve a more efficient handling of the cache, in both terms of size and speed on handling requests. There should be an implementation of the materialiser read and update functions for each policy.
NoOps: for an object cached respecting this policy, there will be no operations stored, only snapshots. This policy could be particularly handy for CRDTs which state is small compared to the size of an operation: e.g. counters, where the cost in terms of size of storing the value (a snapshot) is similar to storing the operation. In this case, for each update, a snapshot should be created. On read, the snapshots would be ready to serve requests. This would improve both size and read time for counter/like CRDTs. DeltaCheckpoints: in the snapshot cache, there will be full snapshots and deltas (from those snapshots). There have to be parameters like how many full snapshots and how many “delta snapshots” are stored. This could be useful for big sets. Ops&Snaps: the current implementation.

GC Also, the idea here is to implement GC policies. The following are possible ideas: NumberLimit: the current implementation. If the number of snapshots or operations in the cache for a given key grows to a certain threshold, perform GC. CacheSize: when the overall cache reaches a memory limit, trigger GC. TimeBased: perform GC every certain period of time. ActiveSnapshot: all partitions should periodically exchange the latest snapshot time an active tx took and exchange it. That way, a partition can GC all state which is older than the ActiveSnapshot. In order to implement this, we should have an efficient broadcast mechanism. Could this possibly be piggybacked with group membership information in riak_core?

aletomsic avatar Nov 25 '14 12:11 aletomsic

For information I did some tests on antidote to see the impact of the materialiser, all experiments are done using counter CRDT types, 2000 keys, single operation transactions either performing a read or an update:


summary_overall This first benchmark is single DC, eight antidote machines, and two workload generation machines each running 80 client threads. Average latencies for reads are around 1 - 2 ms, updates around 3 - 4 ms


summary_overall This second benchmark is the same as the previous, except without the materialiser. By "without the materialiser" I mean that no list of versioned snapshots are created and no lists of updates are calculated and kept in memory. So it is more like a single version database. Everything else is the same, all updates are still logged, all messages sent, locks, etc... What we see here is just a small increase in performance, so it seems like the materialiser is doing a good job, and since it is single DC, likely most the time we are just dealing with the most recent snapshot (i.e. we are not really using the multi-versioning anyway).


summary_overall This graph show what happens when we increase the number of DCs to 3. So each update is replicated 3 times, there is a total of 24 antidote machines (8 per DC) and 6 workload generation machines (2 per DC). What we see here is a huge performance drop as soon as we increase the update rate. Average read latencies go up to around 5 - 10 ms, updates between 10 - 100. In the 99% update workload we see around 5000 ops/sec.


summary_overall

This is the same 3 DC test as above except without the materialiser. Again we are still doing the same amount of logging, network traffic, locks, etc.. We are just avoiding the multi versioning and materialisation. Maybe the first thing to notice is that Antidote scales more as expected, we get 3x increased read throughput vs the single DC and around 2x write throughput (since each update is replicated 3x we are actually performing 6x more work, but most of it asynchronous). Average latencies for reads are around 1 - 2 ms, updates around 3 - 4 ms.


So then the question is, is why does the materialiser work well for 1 DC and so poorly for 3 DCs? Well it appears to be the tradeoff of multi versioning and staleness. In the 1 DC case, we are pretty much always operating on the latest version of the object, as our snapshot is based on our local physical clock. With multiple DCs we are accessing older versions based on the stable time which can vary from node to node and are using a vector clock to find and calculate the version (i.e. there could be a large number of versions). This still shouldn't account for a 20x difference in performance, so I plan to look deeper into the issue (how many snapshots we are creating, do we go to the log etc..), but I see two possible issues, first we risk over-engenieering trying to choose the proper cache size for the number of DCs/update rate, and second that there isn't really much support in Erlang for these kind of shared data structures that these multi-version objects really want to be.

tcrain avatar Dec 02 '15 14:12 tcrain

Tyler, thanks for the extensive testing, I know how hard it is, and how essential.

See comment below.

Le 2 déc. 2015 à 15h42, tcrain [email protected] a écrit :

[…]

So then the question is, is why does the materialiser work well for 1 DC and so poorly for 3 DCs? Well it appears to be the tradeoff of multi versioning and staleness. In the 1 DC case, we are pretty much always operating on the latest version of the object, as our snapshot is based on our local physical clock. With multiple DCs we are accessing older versions based on the stable time which can vary from node to node and are using a vector clock to find and calculate the version (i.e. there could be a large number of versions). This still shouldn't account for a 20x difference in performance, so I plan to look deeper into the issue (how many snapshots we are creating, do we go to the log etc..), but I see two possible issues, first we risk over-engenieering trying to choose the proper cache size for the number of DCs/update rate, and second that there isn't really much support in Erlang for these kind of shared data structures that these multi-version objects really wa! nt to be.

If I follow you correctly, the materialiser is trying to compute eagerly lots of versions without knowing whether they will be ever useful.

The concept of materialiser is to serve as a cache. It is always possible to compute any value from the log. So for me the materialiser should be lazy and compute a materialised version only when necessary, i.e., when a client requests it.

If it turns out that this approach adds too much latency, it is always possible to add some kind of more pro-active daemon, but it should not be wired into the materialiser.

Isn’t this what Santiago is working on?

                Marc

marc-shapiro avatar Dec 02 '15 16:12 marc-shapiro

I guess at this point I haven't investigated closely enough to tell exactly what is taking the most time, so the eagerness/laziness of generating snapshot it definitely something we need to look at.

The way it is implemented now is that within a local DC we always read before performing an update (to generate the downstream [but of course this is not necessary for all CRDT types]). So local updates do generate a snapshot.

Differently when updates arrive that are propagated from other DCs, no snapshot is generated. These are only used in a snapshot when a new update or read is performed locally. So I guess it is working in the way you imagine, i.e. only snapshots that are needed are generated (but this could be optimised for certain objects).

I guess I was more eluding to the point that even if we are only creating useful snapshots, we still have to create lots of them and do a lot of work to do it. So the first thing I was trying to say is that this is a problem and that we have to solve it, and it is an overhead that we have to pay compared to a database that uses simpler versioning. And the second part is that in my mind the solution to this problem is much more adapted to be solved using shared memory. I don't think this is a fault of erlang, but more that what we are doing is sandwiching the functional/message passing part of erlang between two shared memory abstractions. On the bottom we have the shared memory abstraction provided by the hardware and on the top we have this multi-versioned CRDT key-value store which seems to have a lot of likeness with the hardware below [this is only for the materialiser and parts surrounding it, I think erlang works great for the rest of the protocol].

So it would seem nice to perform all this logging and object generation and multi versioning together closer to the hardware, especially if we end up using something like leveldb where we have to pay the price of going back and fourth from the erlang vm anyway. Santiago is sort of working on this, but he was more looking at the structures on disk, but yes since the disk and memory will be somehow projections of each other we should solve these problems together.

tcrain avatar Dec 02 '15 17:12 tcrain

Tyler, you are doing indeed an awesome and difficult job here - thank you very much!

Differently when updates arrive that are propagated from other DCs, no snapshot is generated. These are only used in a snapshot when a new update or read is performed locally. So I guess it is working in the way you imagine, i.e. only snapshots that are needed are generated (but this could be optimised for certain objects).

We had discussed this issue previously (in May): The snapshot generation and update caching should be done type-specific, i.e. sets should not generate snapshots with every add since this uses a lot of space (and time), or counters should be able to apply blind updates. I guess I was more eluding to the point that even if we are only creating useful snapshots, we still have to create lots of them and do a lot of work to do it. So the first thing I was trying to say is that this is a problem and that we have to solve it, and it is an overhead that we have to pay compared to a database that uses simpler versioning. And the second part is that in my mind the solution to this problem is much more adapted to be solved using shared memory. I don't think this is a fault of erlang, but more that what we are doing is sandwiching the functional/message passing part of erlang between two shared memory abstractions. On the bottom we have the shared memory abstraction provided by the hardware and on the top we have this multi-versioned CRDT key-value store which seems to have a lot of likeness with the hardware below [this is only for the materialiser and parts surrounding it, I think erlang works great for the rest of the protocol].

I tend to not agree. Referential transparency is not expensive per se. Usually the overhead stems from the garbage collection that it requires. I don’t know specifically for Erlang, but Haskell uses under the hood in the performance critical libraries several optimizations such as in place updates, sharing of values etc. I wouldn’t be surprised if this is also true for Erlang’s core parts (which are implemented in C…). So it would seem nice to perform all this logging and object generation and multi versioning together closer to the hardware, especially if we end up using something like leveldb where we have to pay the price of going back and fourth from the erlang vm anyway. Santiago is sort of working on this, but he was more looking at the structures on disk, but yes since the disk and memory will be somehow projections of each other we should solve these problems together.

I would strongly advise against coupling the materializer and the logging too closely. We will end up with an unmaintainable chunk of code that will have bugs that would be very hard to locate, introduce bottlenecks and issues due to synchronization etc.

Any other opinions?

Annette

bieniusa avatar Dec 03 '15 09:12 bieniusa

I agree with Annette on the importance of separating mechanism and policy.

I just had a conversation with Tyler. My advice: (1) make sure to identify the source of the bottleneck before trying to fix it -- currently we have a hypothesis but not enough evidence. (2) Rewriting the materialiser in C and interfacing it properly with Erlang is likely to be a manpower sink, so some kind of a cost/benefit thinking is appropriate. (3) Consider some simple fixes first: will a per-client or per-server Zipfian distribution help (likely yes, since the materialiser will not need to incorporate as many foreign updates)? How about batching and centralising the computation of the foreign part of the stable snapshot, i.e. separating read-my-writes (high priority) from foreign ones (tunable priority)?

                                Marc

Le 3 déc. 2015 à 10:28, Annette Bieniusa [email protected] a écrit :

Tyler, you are doing indeed an awesome and difficult job here - thank you very much!

Differently when updates arrive that are propagated from other DCs, no snapshot is generated. These are only used in a snapshot when a new update or read is performed locally. So I guess it is working in the way you imagine, i.e. only snapshots that are needed are generated (but this could be optimised for certain objects).

We had discussed this issue previously (in May): The snapshot generation and update caching should be done type-specific, i.e. sets should not generate snapshots with every add since this uses a lot of space (and time), or counters should be able to apply blind updates. I guess I was more eluding to the point that even if we are only creating useful snapshots, we still have to create lots of them and do a lot of work to do it. So the first thing I was trying to say is that this is a problem and that we have to solve it, and it is an overhead that we have to pay compared to a database that uses simpler versioning. And the second part is that in my mind the solution to this problem is much more adapted to be solved using shared memory. I don't think this is a fault of erlang, but more that what we are doing is sandwiching the functional/message passing part of erlang between two shared memory abstractions. On the bottom we have the shared memory abstraction provided by the hardware and on the top we have this multi-versioned CRDT key-value store which seems to have a lot of likeness with the hardware below [this is only for the materialiser and parts surrounding it, I think erlang works great for the rest of the protocol].

I tend to not agree. Referential transparency is not expensive per se. Usually the overhead stems from the garbage collection that it requires. I don’t know specifically for Erlang, but Haskell uses under the hood in the performance critical libraries several optimizations such as in place updates, sharing of values etc. I wouldn’t be surprised if this is also true for Erlang’s core parts (which are implemented in C…). So it would seem nice to perform all this logging and object generation and multi versioning together closer to the hardware, especially if we end up using something like leveldb where we have to pay the price of going back and fourth from the erlang vm anyway. Santiago is sort of working on this, but he was more looking at the structures on disk, but yes since the disk and memory will be somehow projections of each other we should solve these problems together.

I would strongly advise against coupling the materializer and the logging too closely. We will end up with an unmaintainable chunk of code that will have bugs that would be very hard to locate, introduce bottlenecks and issues due to synchronization etc.

Any other opinions?

Annette

— Reply to this email directly or view it on GitHub https://github.com/SyncFree/antidote/issues/83#issuecomment-161565420.

marc-shapiro avatar Dec 03 '15 10:12 marc-shapiro

As anyone who has worked with native functions in Erlang knows, re-writing in C could cause more harm than good if you don't know how to do it properly. NIFs are incredibly difficult to get right: you must call into the scheduler and record reduction counts properly, as well as yield control and ensure your code is re-entrant, otherwise the preemptive scheduler will unfairly balance your native code vs. your Erlang code.

cmeiklejohn avatar Dec 03 '15 18:12 cmeiklejohn

@tcrain did these measurements take place before your changes to the materializer? I believe PhysiCS might be the "antidote" to this problem.

aletomsic avatar Dec 08 '16 11:12 aletomsic