WIP: Shared plasma
A pyarrow-plasma based shared memory model for multiple workers on a single node. Objects that produce buffers with pickle V5 will have those buffers moved to plasma if larger than a configurable size, and deserialised by viewing those buffers (no copy). This is particularly good when scattering a large object to many/all workers.
Any thoughts, before making this more complete?
Closes #xxxx
- [ ] Tests added / passed
- [ ] Passes
pre-commit run --all-files
Unit Test Results
15 files ± 0 15 suites ±0 6h 15m 21s :stopwatch: - 16m 9s 2 838 tests + 2 2 753 :heavy_check_mark: - 1 83 :zzz: +2 2 :x: +1 21 026 runs +10 20 074 :heavy_check_mark: +3 950 :zzz: +6 2 :x: +1
For more details on these failures, see this check.
Results for commit 0bdc072e. ± Comparison against base commit 6d85a853.
@jrbourbeau , I don't remember who you suggested would be interested in discussing this idea.
I think @crusaderky @jakirkham @gjoseph92, among others, might find this interesting
Yeah this is interesting.
Another option I was looking at previously was using mmap with Zict spilling ( https://github.com/dask/zict/pull/51 ), which would allow for a lot of the same memory sharing between workers (as mmap'd files use memory in kernel space that is shared across processes by default).
cc @quasiben
Thanks for trying this out @martindurant! I've wanted to do this for a long time. A couple general questions:
- To me, the main reason to use plasma is for the optimizations transferring buffers over the network. This would also replace a lot of our complicated, bug-prone data fetching logic with an efficient standalone subcomponent. Are you trying to do inter-worker communication here?
- Plasma is deprecated: https://github.com/apache/arrow/issues/13195.
- Given all that, if you're not going to use plasma for inter-worker communication, what's the benefit to using plasma as opposed to just writing buffers to shared memory directly? I imagine the main one is ObjectID reference counting and garbage collection?
Had held off asking the question of whether we should generally consider using sharded key value stores, but given Gabe's comment above am thinking it is now appropriate. Have we considered using a sharded-key value store for spilling/inter-worker communication?
Serialization would amount to handing a key and some buffers over to the store after any operation and retrieving them from any worker as needed. This would allow the data to be requestable by any worker (including the one that stored it; though we could shortcut that case). Also this could be more robust as it would be easier to recover state when restarting or after losing a worker. For CSP that have some builtin key-value store support, we could leverage that directly.
Anyways something to think about :)
Are you trying to do inter-worker communication here?
This is solely aimed at inter-worker and client-worker comms on a single node.
Deprecated
!? It doesn't say so at the head of https://arrow.apache.org/docs/python/plasma.html !! This was supposed to be a standout feature of the arrow ecosystem.
what's the benefit to using plasma
Yes, the idea was not to have to build another reference-counter, but stick to trusted technology.
Zict spilling
I did consider where in the stack this was most appropriate. My thinking is, that many results on a worker in the data/memory-manager are only ever accessed by subsequent tasks on the same worker, so we should only move/copy to shm when we know it will be useful, and we only know this when serialisation is required.
@jakirkham , would be happy to consider the idea, I don't have an immediate picture of how it looks.
This was supposed to be a standout feature of the arrow ecosystem
I understood it as something that the Ray folks built on top of pyarrow just for Ray, and then decided to upstream. The way Ray has used arrow and plasma has changed considerably over time, so I'm not too surprised if nobody else is maintaining it now (generic object serialization had exactly this lifecycle). I wouldn't be surprised if eventually Plasma moved into the ray project (and maybe even became an implementation detail without a public API). I agree the deprecation seems a bit unclear and undocumented, just saying it doesn't surprise me.
I never thought of plasma as a standout feature of the ecosystem, but rather the ability to implement something like plasma yourself was the point.
Zict spilling
I did consider where in the stack this was most appropriate. My thinking is, that many results on a worker in the data/memory-manager are only ever accessed by subsequent tasks on the same worker, so we should only move/copy to shm when we know it will be useful, and we only know this when serialisation is required.
@jakirkham , would be happy to consider the idea, I don't have an immediate picture of how it looks.
Yeah that's fair. Limiting to when communication is needed or memory is overwhelmed is probably good enough. One of the (admittedly obvious) benefits of shared memory is if multiple workers on the same node need the same object in memory, we only need to pay that cost once. So moving the data to a mmap'd file before sending would keep memory usage more consistent
Have a rough sketch of this from a couple months back. Though it's likely missing things. Added to this draft PR ( https://github.com/dask/distributed/pull/6516 ). Some things we may want to build on top of that (though need not be in that PR) include sending serialized data ( https://github.com/dask/distributed/issues/5900 ) (maybe just adding custom serialization for mmap'd files would be enough?). Using sendfile (or similar when unavailable) to send data between workers without needing to load it in memory immediately ( https://github.com/dask/distributed/issues/5996 ).
Also should add am not attached to this or any other approach. Just think there is some benefit to leveraging shared memory on nodes. Think you and I have discussed this in several issues over the years. So happy with any improvement in this space 🙂
@jakirkham , your mmap thing certainly has the advantage of being a really small change! I would say that there is no conflict with the kind of idea here, and I wonder whether yours has any downside. Still, it's a different use case.
Here is a tangentially interesting article on how fast raw throughput in IPC pipes can be on linux: https://mazzo.li/posts/fast-pipes.html , which is yet another part of the puzzle.
cc @crusaderky
Perhaps some of the people in this thread can have a quick brainstorm after tomorrow's dev team meeting or later in the week. I do believe that this is an important optimisation for part of our user base, and apparently relatively simple to implement in a number of different ways. So we just need to agree on the best approach(es).
Feel free to send me an invite :)
Unit Test Results
See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.
15 files ± 0 15 suites ±0 6h 30m 57s :stopwatch: + 6m 11s 3 231 tests + 5 3 142 :heavy_check_mark: +3 84 :zzz: + 1 5 :x: + 1 23 887 runs +35 22 943 :heavy_check_mark: +4 926 :zzz: +17 18 :x: +14
For more details on these failures, see this check.
Results for commit 2cfb807f. ± Comparison against base commit cff33d50.
:recycle: This comment has been updated with latest results.
Created https://github.com/orgs/dask/projects/3/views/1 with very draft project plan for shm. All thoughts appreciated!
Now does worker-worker too (which is the important case!).
Notes:
- required passing through the serialisations to the workers in LocalCluster. This should probably be reverted and only defined in configuration
- current state of this PR includes a lot of prints to follow the data flow; of course these are not to be included in any final version
Thanks for diving into this, @martindurant, it looks really interesting!
To follow up on @gjoseph92's comment regarding Plasma being deprecated: According to this thread on the [email protected] mailing list, it appears as though no one is actively working on Plasma within Arrow. Just from browsing the modification dates on the apache/arrow repo, it also looks like the Plasma code has not been touched in months, if not years.
Another thing to consider might be LMDB. Think there is support for LMDB in Zict.
Specifically this, though: https://lmdb.readthedocs.io/en/release/#buffers ; we want memoryviews, not bytes objects.
However:
In both PyPy and CPython, returned buffers must be discarded after their producing transaction has completed or been modified in any way. To preserve buffer’s contents, copy it using bytes():
but this seems to be in the context of possible writes.
Right we do that in Zarr's LMDBStore, but I don't think Zict is doing this currently. It's easy enough to add though
We could use toreadonly() to make sure all memoryviews returned are immutable
Agreed, LMDB looks like just the thing if we can get around the invalid memory thing (looks like probably yes); but the point of course was to benchmark to see which is best!
Hi folks, I would like to introduce our project vineyard ( https://github.com/v6d-io/v6d, a CNCF sandbox project) to the Dask community.
Vineyard is a shared memory engine for distributed computing. Like plasma, vineyard provides a shared memory abstraction for connected IPC clients as well (see https://v6d.io/notes/getting-started.html), but unlike plasma, vineyard doesn't require objects need to be serialized before putting into vineyard, rather, it introduces a builder/resolver framework to allow clients stores the metadata and blobs separately, to allow efficient zero-copy sharing between computing engines in different languages (see more details about the idea in https://v6d.io/notes/objects.html#).
We haven't compared the vineyard's performance with plasma, but it is in our plan. If possible, I would like to introduce vineyard to the shared-memory experiments in Dask and to see if it could be helpful.
The integration of plasma looks quite concise in this PR compared to my previous experiment with vineyard. I would add vineyard to my own fork first and try with @martindurant's ongoing shm benchmarks.
Actually, we have worked on some experiments about integrating with dask and vineyard for a while, but stopped the investigation after reading some previous discussion about introducing shared memory to dask (e.g., https://github.com/dask/distributed/issues/4497) and assumed shared memory won't be considered in dask. Thanks @martindurant to dive into such a feature and it looks really exciting!
Thanks @sighingnow for getting in touch. I was aware of vineyard via discussions in Ray about using it instead of or alongside plasma, but this PR was already underway by then. You will also see that I have added an implementation based on LMDB in the most recent commits, so comparing and contrasting against other frameworks is very much in my plan, and it's great to have someone knowledgable in this mix! This PR will not be accepted as it stands, but we can use this as a space to push code for testing/benchmarking - I'd be happy for you to make PRs to this branch rather than make a separate branch, if you prefer. I mist admit I don't understand how you can share objects without some serialisation, but I'm prepared to be educated.
This PR will not be accepted as it stands, but we can use this as a space to push code for testing/benchmarking
Will submit pr against the martindurant/distributed:shared_plasma branch.
I mist admit I don't understand how you can share objects without some serialisation
I would like to describe it as "lightweight serialization" instead to make it more clear, as "without sterilization" in my previous comments is a bit misleading. It works like pickle5 in the sense of decoupling metadata and payloads (in pickle5 serialize an object, e.g., numpy.ndarray you will receive a buffer with some meta information and an out-of-band buffer for the binary payload).
In vineyard, we organize objects into metadata and payloads buffers as well, but the metadata is organized as a JSON tree so it can be exchanged between different languages easily.
Besides, we have implemented a client-side allocator that could allocate memory directly in the shared memory. The client-side allocator can be configured as the array data allocator in numpy (see also https://numpy.org/neps/nep-0049.html) then when we decide to put a numpy.ndarray into vineyard we could save the copy, as it has already been placed on the shared memory arena. Note that unlike create_buffer/release_buffer, the malloc/free in the client-side allocator won't trigger a request/response roundtrip over the IPC socket to the server.
The allocator API hasn't been polished for python but has been used in other C++ engines. It could bring added value to Dask as well, I think. As if the cost of "copying-to-shared-memory" could be saved, the worker-to-worker communication on the same node could be really cheap.
It might be worth looking at using Dask's builtin serialization directly as it also generates metadata and out-of-band buffers (instead of only relying on pickle protocol 5). We use MessagePack with the metadata currently, but this may also work with JSON (untested) possibly with some tweaks.
Ah, yes indeed, it sounds like vineyard has something similar to dask's serialization system - which would be attractive if we didn't already have one.
The ability to directly allocate into the shared memory is a pretty big deal, though, and certainly not done by the plasma or lmdb implementations. It makes me wonder what are the downsides to allocating all buffers in shared memory, aside from a difficulty getting the real memory footprint of a process.
It might be worth looking at using [Dask's builtin serialization (https://distributed.dask.org/en/stable/serialization.html#serialization) directly
I have noticed that. The data serilaization protocol should be orthogonal with using vineyard as the underlying shared memory store, I think. I would take a try.
It makes me wonder what are the downsides to allocating all buffers in shared memory, aside from a difficulty getting the real memory footprint of a process.
Allocating all buffers in shared memory requires incorporation from the serializer and deserializer, i.e., the serializer needs to tell the offset to the base address of shared memory arena, and the deserializer needs to be able to translate the offset to the address in the memory address space of deserializer process.
Actually the "offset" is done in plasma and vineyard as a the "object id".
From our experience about integrating vineyard with other computing engines (a online graph store engine), we use json tree to includes all blob ids of a graph data structure and pass the json to another computing process and the graph data structure is restore back from the json (and included blobs) there.
It makes me wonder what are the downsides to allocating all buffers in shared memory, aside from a difficulty getting the real memory footprint of a process.
And it would introduce the constriant that blob (allocated memory block) needs to freed by the allocator in the its creator process, as the allocator has its internal state. It works in the "single writer - multiple reader (readonly)" settings and if freeing objects created by other worker in dask is not allowed, it should be applicable as well.
See very preliminary results at https://github.com/martindurant/shm-distributed#readme (and the code that made those in the same repo). Someone can tell me what I'm doing wrong in the choice of workflows or how I've implemented them. This is running against the current state of this branch.
Thanks Martin 🙏
Raised a couple ideas as issues:
- https://github.com/martindurant/shm-distributed/issues/1
- https://github.com/martindurant/shm-distributed/issues/2