distributed icon indicating copy to clipboard operation
distributed copied to clipboard

WIP: Shared plasma

Open martindurant opened this issue 3 years ago • 32 comments

A pyarrow-plasma based shared memory model for multiple workers on a single node. Objects that produce buffers with pickle V5 will have those buffers moved to plasma if larger than a configurable size, and deserialised by viewing those buffers (no copy). This is particularly good when scattering a large object to many/all workers.

Any thoughts, before making this more complete?

Closes #xxxx

  • [ ] Tests added / passed
  • [ ] Passes pre-commit run --all-files

martindurant avatar Jun 03 '22 20:06 martindurant

Unit Test Results

       15 files  ±  0         15 suites  ±0   6h 15m 21s :stopwatch: - 16m 9s   2 838 tests +  2    2 753 :heavy_check_mark:  - 1    83 :zzz: +2  2 :x: +1  21 026 runs  +10  20 074 :heavy_check_mark: +3  950 :zzz: +6  2 :x: +1 

For more details on these failures, see this check.

Results for commit 0bdc072e. ± Comparison against base commit 6d85a853.

github-actions[bot] avatar Jun 03 '22 20:06 github-actions[bot]

@jrbourbeau , I don't remember who you suggested would be interested in discussing this idea.

martindurant avatar Jun 06 '22 16:06 martindurant

I think @crusaderky @jakirkham @gjoseph92, among others, might find this interesting

jrbourbeau avatar Jun 06 '22 17:06 jrbourbeau

Yeah this is interesting.

Another option I was looking at previously was using mmap with Zict spilling ( https://github.com/dask/zict/pull/51 ), which would allow for a lot of the same memory sharing between workers (as mmap'd files use memory in kernel space that is shared across processes by default).

cc @quasiben

jakirkham avatar Jun 06 '22 17:06 jakirkham

Thanks for trying this out @martindurant! I've wanted to do this for a long time. A couple general questions:

  1. To me, the main reason to use plasma is for the optimizations transferring buffers over the network. This would also replace a lot of our complicated, bug-prone data fetching logic with an efficient standalone subcomponent. Are you trying to do inter-worker communication here?
  2. Plasma is deprecated: https://github.com/apache/arrow/issues/13195.
  3. Given all that, if you're not going to use plasma for inter-worker communication, what's the benefit to using plasma as opposed to just writing buffers to shared memory directly? I imagine the main one is ObjectID reference counting and garbage collection?

gjoseph92 avatar Jun 06 '22 18:06 gjoseph92

Had held off asking the question of whether we should generally consider using sharded key value stores, but given Gabe's comment above am thinking it is now appropriate. Have we considered using a sharded-key value store for spilling/inter-worker communication?

Serialization would amount to handing a key and some buffers over to the store after any operation and retrieving them from any worker as needed. This would allow the data to be requestable by any worker (including the one that stored it; though we could shortcut that case). Also this could be more robust as it would be easier to recover state when restarting or after losing a worker. For CSP that have some builtin key-value store support, we could leverage that directly.

Anyways something to think about :)

jakirkham avatar Jun 06 '22 18:06 jakirkham

Are you trying to do inter-worker communication here?

This is solely aimed at inter-worker and client-worker comms on a single node.

Deprecated

!? It doesn't say so at the head of https://arrow.apache.org/docs/python/plasma.html !! This was supposed to be a standout feature of the arrow ecosystem.

what's the benefit to using plasma

Yes, the idea was not to have to build another reference-counter, but stick to trusted technology.

Zict spilling

I did consider where in the stack this was most appropriate. My thinking is, that many results on a worker in the data/memory-manager are only ever accessed by subsequent tasks on the same worker, so we should only move/copy to shm when we know it will be useful, and we only know this when serialisation is required.

@jakirkham , would be happy to consider the idea, I don't have an immediate picture of how it looks.

martindurant avatar Jun 06 '22 22:06 martindurant

This was supposed to be a standout feature of the arrow ecosystem

I understood it as something that the Ray folks built on top of pyarrow just for Ray, and then decided to upstream. The way Ray has used arrow and plasma has changed considerably over time, so I'm not too surprised if nobody else is maintaining it now (generic object serialization had exactly this lifecycle). I wouldn't be surprised if eventually Plasma moved into the ray project (and maybe even became an implementation detail without a public API). I agree the deprecation seems a bit unclear and undocumented, just saying it doesn't surprise me.

I never thought of plasma as a standout feature of the ecosystem, but rather the ability to implement something like plasma yourself was the point.

gjoseph92 avatar Jun 06 '22 22:06 gjoseph92

Zict spilling

I did consider where in the stack this was most appropriate. My thinking is, that many results on a worker in the data/memory-manager are only ever accessed by subsequent tasks on the same worker, so we should only move/copy to shm when we know it will be useful, and we only know this when serialisation is required.

@jakirkham , would be happy to consider the idea, I don't have an immediate picture of how it looks.

Yeah that's fair. Limiting to when communication is needed or memory is overwhelmed is probably good enough. One of the (admittedly obvious) benefits of shared memory is if multiple workers on the same node need the same object in memory, we only need to pay that cost once. So moving the data to a mmap'd file before sending would keep memory usage more consistent

Have a rough sketch of this from a couple months back. Though it's likely missing things. Added to this draft PR ( https://github.com/dask/distributed/pull/6516 ). Some things we may want to build on top of that (though need not be in that PR) include sending serialized data ( https://github.com/dask/distributed/issues/5900 ) (maybe just adding custom serialization for mmap'd files would be enough?). Using sendfile (or similar when unavailable) to send data between workers without needing to load it in memory immediately ( https://github.com/dask/distributed/issues/5996 ).

Also should add am not attached to this or any other approach. Just think there is some benefit to leveraging shared memory on nodes. Think you and I have discussed this in several issues over the years. So happy with any improvement in this space 🙂

jakirkham avatar Jun 06 '22 23:06 jakirkham

@jakirkham , your mmap thing certainly has the advantage of being a really small change! I would say that there is no conflict with the kind of idea here, and I wonder whether yours has any downside. Still, it's a different use case.

Here is a tangentially interesting article on how fast raw throughput in IPC pipes can be on linux: https://mazzo.li/posts/fast-pipes.html , which is yet another part of the puzzle.

martindurant avatar Jun 07 '22 16:06 martindurant

cc @crusaderky

jakirkham avatar Jun 09 '22 16:06 jakirkham

Perhaps some of the people in this thread can have a quick brainstorm after tomorrow's dev team meeting or later in the week. I do believe that this is an important optimisation for part of our user base, and apparently relatively simple to implement in a number of different ways. So we just need to agree on the best approach(es).

martindurant avatar Jun 13 '22 17:06 martindurant

Feel free to send me an invite :)

jakirkham avatar Jun 13 '22 18:06 jakirkham

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±  0         15 suites  ±0   6h 30m 57s :stopwatch: + 6m 11s   3 231 tests +  5    3 142 :heavy_check_mark: +3    84 :zzz: +  1    5 :x: +  1  23 887 runs  +35  22 943 :heavy_check_mark: +4  926 :zzz: +17  18 :x: +14 

For more details on these failures, see this check.

Results for commit 2cfb807f. ± Comparison against base commit cff33d50.

:recycle: This comment has been updated with latest results.

github-actions[bot] avatar Jul 11 '22 20:07 github-actions[bot]

Created https://github.com/orgs/dask/projects/3/views/1 with very draft project plan for shm. All thoughts appreciated!

martindurant avatar Jul 11 '22 20:07 martindurant

Now does worker-worker too (which is the important case!).

Notes:

  • required passing through the serialisations to the workers in LocalCluster. This should probably be reverted and only defined in configuration
  • current state of this PR includes a lot of prints to follow the data flow; of course these are not to be included in any final version

martindurant avatar Jul 13 '22 14:07 martindurant

Thanks for diving into this, @martindurant, it looks really interesting!

To follow up on @gjoseph92's comment regarding Plasma being deprecated: According to this thread on the [email protected] mailing list, it appears as though no one is actively working on Plasma within Arrow. Just from browsing the modification dates on the apache/arrow repo, it also looks like the Plasma code has not been touched in months, if not years.

hendrikmakait avatar Jul 20 '22 18:07 hendrikmakait

Another thing to consider might be LMDB. Think there is support for LMDB in Zict.

jakirkham avatar Jul 20 '22 19:07 jakirkham

Specifically this, though: https://lmdb.readthedocs.io/en/release/#buffers ; we want memoryviews, not bytes objects.

However:

In both PyPy and CPython, returned buffers must be discarded after their producing transaction has completed or been modified in any way. To preserve buffer’s contents, copy it using bytes():

but this seems to be in the context of possible writes.

martindurant avatar Jul 20 '22 21:07 martindurant

Right we do that in Zarr's LMDBStore, but I don't think Zict is doing this currently. It's easy enough to add though

We could use toreadonly() to make sure all memoryviews returned are immutable

jakirkham avatar Jul 21 '22 01:07 jakirkham

Agreed, LMDB looks like just the thing if we can get around the invalid memory thing (looks like probably yes); but the point of course was to benchmark to see which is best!

martindurant avatar Jul 21 '22 21:07 martindurant

Hi folks, I would like to introduce our project vineyard ( https://github.com/v6d-io/v6d, a CNCF sandbox project) to the Dask community.

Vineyard is a shared memory engine for distributed computing. Like plasma, vineyard provides a shared memory abstraction for connected IPC clients as well (see https://v6d.io/notes/getting-started.html), but unlike plasma, vineyard doesn't require objects need to be serialized before putting into vineyard, rather, it introduces a builder/resolver framework to allow clients stores the metadata and blobs separately, to allow efficient zero-copy sharing between computing engines in different languages (see more details about the idea in https://v6d.io/notes/objects.html#).

We haven't compared the vineyard's performance with plasma, but it is in our plan. If possible, I would like to introduce vineyard to the shared-memory experiments in Dask and to see if it could be helpful.

The integration of plasma looks quite concise in this PR compared to my previous experiment with vineyard. I would add vineyard to my own fork first and try with @martindurant's ongoing shm benchmarks.

Actually, we have worked on some experiments about integrating with dask and vineyard for a while, but stopped the investigation after reading some previous discussion about introducing shared memory to dask (e.g., https://github.com/dask/distributed/issues/4497) and assumed shared memory won't be considered in dask. Thanks @martindurant to dive into such a feature and it looks really exciting!

sighingnow avatar Aug 23 '22 01:08 sighingnow

Thanks @sighingnow for getting in touch. I was aware of vineyard via discussions in Ray about using it instead of or alongside plasma, but this PR was already underway by then. You will also see that I have added an implementation based on LMDB in the most recent commits, so comparing and contrasting against other frameworks is very much in my plan, and it's great to have someone knowledgable in this mix! This PR will not be accepted as it stands, but we can use this as a space to push code for testing/benchmarking - I'd be happy for you to make PRs to this branch rather than make a separate branch, if you prefer. I mist admit I don't understand how you can share objects without some serialisation, but I'm prepared to be educated.

martindurant avatar Aug 23 '22 01:08 martindurant

This PR will not be accepted as it stands, but we can use this as a space to push code for testing/benchmarking

Will submit pr against the martindurant/distributed:shared_plasma branch.

I mist admit I don't understand how you can share objects without some serialisation

I would like to describe it as "lightweight serialization" instead to make it more clear, as "without sterilization" in my previous comments is a bit misleading. It works like pickle5 in the sense of decoupling metadata and payloads (in pickle5 serialize an object, e.g., numpy.ndarray you will receive a buffer with some meta information and an out-of-band buffer for the binary payload).

In vineyard, we organize objects into metadata and payloads buffers as well, but the metadata is organized as a JSON tree so it can be exchanged between different languages easily.

Besides, we have implemented a client-side allocator that could allocate memory directly in the shared memory. The client-side allocator can be configured as the array data allocator in numpy (see also https://numpy.org/neps/nep-0049.html) then when we decide to put a numpy.ndarray into vineyard we could save the copy, as it has already been placed on the shared memory arena. Note that unlike create_buffer/release_buffer, the malloc/free in the client-side allocator won't trigger a request/response roundtrip over the IPC socket to the server.

The allocator API hasn't been polished for python but has been used in other C++ engines. It could bring added value to Dask as well, I think. As if the cost of "copying-to-shared-memory" could be saved, the worker-to-worker communication on the same node could be really cheap.

sighingnow avatar Aug 23 '22 03:08 sighingnow

It might be worth looking at using Dask's builtin serialization directly as it also generates metadata and out-of-band buffers (instead of only relying on pickle protocol 5). We use MessagePack with the metadata currently, but this may also work with JSON (untested) possibly with some tweaks.

jakirkham avatar Aug 23 '22 06:08 jakirkham

Ah, yes indeed, it sounds like vineyard has something similar to dask's serialization system - which would be attractive if we didn't already have one.

The ability to directly allocate into the shared memory is a pretty big deal, though, and certainly not done by the plasma or lmdb implementations. It makes me wonder what are the downsides to allocating all buffers in shared memory, aside from a difficulty getting the real memory footprint of a process.

martindurant avatar Aug 23 '22 14:08 martindurant

It might be worth looking at using [Dask's builtin serialization (https://distributed.dask.org/en/stable/serialization.html#serialization) directly

I have noticed that. The data serilaization protocol should be orthogonal with using vineyard as the underlying shared memory store, I think. I would take a try.

It makes me wonder what are the downsides to allocating all buffers in shared memory, aside from a difficulty getting the real memory footprint of a process.

Allocating all buffers in shared memory requires incorporation from the serializer and deserializer, i.e., the serializer needs to tell the offset to the base address of shared memory arena, and the deserializer needs to be able to translate the offset to the address in the memory address space of deserializer process.

Actually the "offset" is done in plasma and vineyard as a the "object id".

From our experience about integrating vineyard with other computing engines (a online graph store engine), we use json tree to includes all blob ids of a graph data structure and pass the json to another computing process and the graph data structure is restore back from the json (and included blobs) there.

sighingnow avatar Aug 24 '22 06:08 sighingnow

It makes me wonder what are the downsides to allocating all buffers in shared memory, aside from a difficulty getting the real memory footprint of a process.

And it would introduce the constriant that blob (allocated memory block) needs to freed by the allocator in the its creator process, as the allocator has its internal state. It works in the "single writer - multiple reader (readonly)" settings and if freeing objects created by other worker in dask is not allowed, it should be applicable as well.

sighingnow avatar Aug 24 '22 06:08 sighingnow

See very preliminary results at https://github.com/martindurant/shm-distributed#readme (and the code that made those in the same repo). Someone can tell me what I'm doing wrong in the choice of workflows or how I've implemented them. This is running against the current state of this branch.

martindurant avatar Aug 26 '22 20:08 martindurant

Thanks Martin 🙏

Raised a couple ideas as issues:

  • https://github.com/martindurant/shm-distributed/issues/1
  • https://github.com/martindurant/shm-distributed/issues/2

jakirkham avatar Aug 26 '22 22:08 jakirkham