Dagger.jl icon indicating copy to clipboard operation
Dagger.jl copied to clipboard

Serialization should be done on a separate thread

Open shashi opened this issue 7 years ago • 13 comments

Consider the following sequence of events:

T+1: Worker 1 finished a task (Task 1), and notifies the scheduler about it T+2: The scheduler replies with a new task for Worker 1 (which, let's say, only depends on local data to keep this example simple) T+3: Worker 1 starts working on the new task (Task 2) T+4: Worker 2 which was at work all this while finishes and notified the scheduler T+5: The scheduler replies with a new task (Task 3) for Worker 2. This task depends on the output of Task 1 which is still with Worker 1. T+6: Worker 2 tries to fetch the data required to start working on Task 3, but blocks because Worker 2 is still busy with Task 2 (typically in a compute intensive for loop) and cannot serve Worker 2 the data immediately. ....worker 2 waits.... ....and waits... T+7: Worker 1 finishes Task 2 (and notifies the scheduler). ...worker 2 is still waiting... T+8: Worker 1 reads Worker 2's request and responds to it with the requested data. T+9: Finally the wait is over! Worker 2 can start on Task 3. T+10: Worker 1 reads Task 4 from the scheduler, and Task 4 better not depend on Task 2, because then Worker 1 would have to wait for Worker 2 to finish Task 3........... God help these vengeful quibbling workers.

It works this way because:

  1. We don't want the scheduler process doing serialize/deserialize while it should really be scheduling things as fast as possible.
  2. It would double the number of times we do serialize/deserialize

The workers really shouldn't be waiting on each other. This happens all the time in matrix multiplication, and sometimes when it doesn't, it's much faster! There are a few solutions to this:

  1. Have half the workers (or maybe just 2 workers) just reading and relaying intermediate data. So every worker sends its output to these processes after its done and reads from these processes when it needs some other worker's data before starting a new task.
  2. Create a special kind of task that says "Wait for someone to take the data that you have computed for Task 1 and then start working on this Task 2"
  3. Do a hybrid of 1 and 2. i.e. have a deadline for the wait in 2 and fall back to 1..

This problem arises because we just use the shared memory scheduler from Dask (which doesn't have this problem, because well you don't need to communicate in a shared-memory set up). I believe the multiprocessing scheduler in Dask was just sending results to the scheduler at the time. So I figured this might not be great for embarassingly parallel workloads.

A great way to solve this issue is to hook into distributed scheduler and implement its client interface. This, I beleive will not only solve this issue, but make sure other things (like data locality in HDFS) work great in a cluster set-up. (@mrocklin had got me bootstrapped with this process. here are the simple send_msg and recv_msg functions https://gist.github.com/shashi/e8f37c5f61bab4219555cd3c4fef1dc4) see https://github.com/dask/distributed/issues/586 for a discussion on this.

shashi avatar Feb 28 '17 04:02 shashi

Ideally you would have a separate thread doing the communication. I'm not sure what the status of Julia on separate threads; I've heard that there has been some progress here.

If Julia went the dask.distributed route (which I of course recommend) you would still need to implement dask workers in Julia that would need to have separate threads for communication and computation.

mrocklin avatar Feb 28 '17 14:02 mrocklin

@mrocklin thanks for chiming in! Julia doesn't yet have the ability to run arbitrary code in different threads. The only real API is the @threads macro for paralellizing embarrassingly parallel loops. However what does work is the worker can communicate and compute in separate tasks. If we manage to get the worker to start sending/receiving before starting to compute, the computation can happen in parallel to the sending/receiving. This is handled by libuv which is what Julia's IO system is based on.

shashi avatar Feb 28 '17 15:02 shashi

This is made worse by the fact that you have to serialize. Julia's distributed computing primitives currently serialize directly onto the IO socket to avoid extra allocation, so I believe computation and IO are already interleaved... :(

shashi avatar Feb 28 '17 15:02 shashi

Is there a schedule for threads in Julia?

In cases where you have lots of work up-front then the existing system would probably run well. There would be a significant lag between a task becoming ready-to-run and the appropriate worker getting the necessary data, but if you can cover this lag with extra work then you might be OK. Some workloads look like this. Some don't.

Another, possibly ugly solution, would be to host Julia workers in another language. I'm not sure how good the Python-Julia bridge is these days, but I can imagine using Python to do all of the administrative work while letting Julia handle the computation. I guess we this still relies on Julia to do serialization though, which might block other Julia code running computations. Can I run multiple Julia functions from different Python threads concurrently? Is there significant overhead to all of this? If something like this was possible it would also significantly reduce the cost to create a minimal working system.

mrocklin avatar Feb 28 '17 15:02 mrocklin

Is there a schedule for threads in Julia?

Well libuv has one inside itself. All the async Julia features and IO go through this. I'm not sure if we can hack it to work for us in waiting for requests coming in from other workers for data, and force a context switch.... Maybe @Keno knows.

Some workloads look like this.

You mean the scheduler can say "do X or Y" to a worker? Does this sort of thing work right now in Dask distributed? This is an interesting idea and possibly a good fix to this issue temporarily.

Can I run multiple Julia functions from different Python threads concurrently?

That would run into the same problems because if you're talking to Julia and want shared-memory you'd have the same issues in the runtime that are to be fixed for native threading in Julia to work.

Is there significant overhead to all of this?

The overhead is negligible for the Python-Julia bridge and it's quite amazing for other things.

shashi avatar Feb 28 '17 15:02 shashi

Is there a schedule for threads in Julia?

Sorry, overloaded term. Is there a rough planned calendar date where Julia will reliably support threading?

You mean the scheduler can say "do X or Y" to a worker? Does this sort of thing work right now in Dask distributed?

Yeah, so as of dask.distributed version 1.14 (I think) we started pushing more of the scheduling logic onto the workers (which also makes recreating dask in other languages somewhat more expensive). Currently every task that is ready to run is allocated to some worker. This allows workers to think a bit more and overlap communication.

Generally we did this to improve hardware utilization, both to remove the few milliseconds of delay of roundtrip to the scheduler, and to help workers bundle inter-worker communications. It would also help in this case.

The overhead is negligible for the Python-Julia bridge and it's quite amazing for other things.

This might be something to think about. The Python workers have evolved a bit since we last talked. If we can reuse that logic/codebase it might be valuable.

mrocklin avatar Feb 28 '17 15:02 mrocklin

Is there a rough planned calendar date

There's been a lot of progress made and it's slated for the 1.0 release which is coming out in late summer this year. There are issues with thread safety of I/O code right now.

This allows workers to think a bit more and overlap communication.

Neat!

Any documentation about this I can read up?

It would also help in this case.

Sounds great!

shashi avatar Feb 28 '17 16:02 shashi

The most relevant documentation is probably here: http://distributed.readthedocs.io/en/latest/worker.html#internal-scheduling

Note though that this is specific to the Python worker. If you write a worker in Julia you would either rewrite this logic or write something completely new (the scheduler won't care, as long as tasks eventually complete). If we hosted Julia computation from Python then we get this logic for free.

mrocklin avatar Feb 28 '17 16:02 mrocklin

That is pretty cool!

shashi avatar Feb 28 '17 16:02 shashi

I don't have much experience in distributed computing but I would like to make this happen. I have some questions about hooking the scheduler @mrocklin.

In https://github.com/dask/distributed/issues/586#issue-183209753 you say:

The worker code: https://github.com/dask/distributed/blob/master/distributed/worker.py . It's about 1000 lines of Python, only about half of which is actually necessary for a minimal implementation.

The client code: https://github.com/dask/distributed/blob/master/distributed/client.py . It's about 2000 lines of Python, only about a fifth of which is probably necessary for a minimal implementation. Much of this is special features that have accrued over the past year.

Are these lines easily divisible? The code base is very big and it would be very helpful whilst I'm trying to understand the internals.

Another question I have is about the scheduler. It is language agnostic because it communicates with messages, but does this mean I just need to launch a python script with the scheduler from Julia and then I can start creating workers natively?

lopezm94 avatar Mar 06 '17 15:03 lopezm94

Great questions @lopezm94 !

First, generally I think that there are probably two ways to do this:

  1. Build a worker and client in Julia. Long term this is probably best. However to do this well it would be much better if Julia supported threads, at least in some minimal way.
  2. Build a client in Julia, but reuse the existing python worker and use something like PyJulia instead. This is probably easier to get running quickly and will probably work better while Julia remains thread-challenged.

Since that comment the worker has expanded to around 2000 lines. The minimal implementation probably hasn't increased significantly though.

Within the client code, I recommend searching for calls to send_to_scheduler and the two references to self._handlers. These will point you to the messages that you will need to send to the scheduler and expect to receive back.

If you choose to reimplement the workers in Julia (rather than use PyJulia) you should look at Worker.compute_stream and some (but not all) of the handlers. This reduced set should suffice:

        handlers = {
          'gather': self.gather,
          'compute-stream': self.compute_stream,
          'get_data': self.get_data,
          'delete_data': self.delete_data,
          'terminate': self.terminate,
          'keys': self.keys,
        }

If I were trying to do this as a summer project I would probably start by seeing if reusing the Python worker with PyJulia is feasible. In this case we would replace tasks like the following:

julia_function(x, y, z)

with tasks like the following:

pyjulia.compile_and_run(julia-code, x, y, z)

Or something similar. Python would continue to handle all of the communication and administrative work but Julia would do the computation and intermediate results would live within Julia. @shashi might have other thoughts about this though.

mrocklin avatar Mar 06 '17 15:03 mrocklin

If I were trying to do this as a summer project I would probably start by seeing if reusing the Python worker with PyJulia is feasible.

Yes, I agree. First the client and when it works then everything else. Thanks for the explanation.

lopezm94 avatar Mar 07 '17 17:03 lopezm94

It might be good to move dask.distributed-related discussion here to https://github.com/invenia/Dispatcher.jl/issues/11

(updated OP with @mrocklin's send_msg and recv_msg functions)

shashi avatar Apr 27 '17 04:04 shashi