dask-glm icon indicating copy to clipboard operation
dask-glm copied to clipboard

Parameter server

Open mrocklin opened this issue 8 years ago • 27 comments

This is a summary of an e-mail between myself and @MLnick

From Nick

Taking the simplest version of say logistic regression, the basic idea is to split up the parameter vector (beta) itself into chunks (so it could be a dask array potentially, or some other distributed dask datastructure). Training would then iterate over "mini-batches" using SGD (let's say each mini batch is a chunk of the X array). In each mini batch, the worker will "pull" the latest version of "beta" from the Parameter Server and compute the (local) gradient for the batch. The worker then sends this gradient to the PS, which then performs the update (i.e. update its part of "beta" using say the gradient update from the worker and the step size). The next iteration then proceeds in the same way. This can be sync or async (but typically is either fully async or "bounded stale" async).

The key is to do this effectively as direct communication from the worker doing the mini batch gradient computation, to the worker holding the parameters (the "parameter server"), without involving the master ("client" app) at all, and to only "pull" and "push" the part of beta required for local computation (due to sparsity this doesn't need to be the full beta in many cases). In situations where the data is very sparse (e.g. like the Criteo data) the communication is substantially reduced in this approach. And the model size can be scaled up significantly (e.g. for FMs the model size can be very large).

This is slightly different from the way say L-BFGS works currently (and the way I seem to understand ADMM works in dask-glm) - i.e. that more or less a set of local computations are performed on the distributed data on the workers, and the results collected back to the "master", where an update step is performed (using LBFGS or the averaging of ADMM, respectively). This is also the way Spark does things.

What I'm struggling with is quite how to achieve the PS approach in dask. It seems possible to do it in a few different ways, e.g. perhaps it's possible just using simple distributed dask arrays, or perhaps using "worker_client" and/or Channels. The issue I have is how to let each worker "pull" the latest view of "beta" in each iteration, and how to have each worker "push" its local gradient out to update the "beta" view, without the "master" being involved.

I'm looking into the async work in http://matthewrocklin.com/blog/work/2017/04/19/dask-glm-2 also to see if I can do something similar here.

From me

First, there are two nodes that you might consider the "master", the scheduler and the client. This is somewhat of a deviation from Spark, where they are both in the same spot.

Second, what are your communication and computation requirements? A roundtrip from the client to scheduler to worker to scheduler to client takes around 10ms on a decent network. A worker-worker communication would be shorter, definitely, but may also involve more technology. We can do worker-to-worker direct, but I wanted to make sure that this was necessary.

Channels currently coordinate metadata through the scheduler. They work a bit like this:

  1. Worker A subscribes to channel, tells scheduler
  2. Worker B subscribes to channel, tells scheduler
  3. Worker A creates some data and registers it on the channel, tells the scheduler
  4. Scheduler tells all workers that are on this channel (A and B) that a new piece of data is on the channel
  5. Worker B says great, I want this data, and asks the scheduler where it can get it
  6. Scheduler tells Worker B that the data is on Worker A
  7. Worker B gets data from Worker A

So there are a few network hops here, although each should be in the millisecond range (I think?).

We could also set up a proper parameter server structure with single hop communicatinos. Building these things isn't hard. As usual my goal is to extract from this experiment something slightly more general to see if we can hit a broader use case.

So I guess my questions become:

  1. What are your communication requirements
  2. How much data are you likely to shove through this
  3. Are you likely to have multiple parameter servers? If so how would you anticipate sharding communication?

From Nick

The PS idea is very simple at the high level. The "parameter server" can be thought of as a "distributed key-value store". There could be 1 or more PS nodes (the idea is precisely to allow scaling the size of model parameters across multiple nodes, such as in the case of factorization machines, neural networks etc).

A good reference paper is https://www.cs.cmu.edu/~muli/file/parameter_server_osdi14.pdf

So in theory, at the start of an iteration, a worker node asks the PS for only the parameters it needs to compute its update (in sparse data situations, this might only be a few % of the overall # features, per "partition" or "batch"). This can be thought of as a set of (key, value) pairs where the keys are vector indices and the values are vector values at the corresponding index, of the parameter vector. In practice, each PS node will hold a "slice" of the parameter vector (the paper uses a chord key layout for example), and will work with vectors rather than raw key-value pairs, for greater efficiency.

It seems like Channels might be a decent way to go about this. Yes, there is some network comm overhead but in practice for a large scale problem, the time to actually send the data (parameters and gradients say) would dominate the few ms of network hops. This cost could also be partly hidden through async operations.

The way I thought about it with Channels, which you touch on is:

  1. Let's say we have 1x PS worker for simplicity, and some other "compute" workers. The "compute" workers will hold the chunks of data (X, y blocks). The PS will hold "beta".
  2. PS creates an initial beta vector (random data, zeros, whatever). It could "publish" this vector (future?) on the Channel "params", saying "here is the latest version of beta".
  3. Workers start iteration 1, and pull the new "beta" (future?) off the channel. Let's say Worker A perhaps needs 10% of the total vector - so it "pulls" beta[idx] from PS - where idx is the set of active feature indices it needs to compute it's gradient.
  4. Worker A computes its partial gradient for the chunk. It needs to "push" this grad[idx] (or alternatively, a "sparse vector" version of grad) to the PS. It could push this as another future into the channel? Or perhaps another channel? But would the idea be that PS gets this future off the channel, knows that Worker A holds the data it needs, and does something like beta[idx] -= grad[idx] * step_size (simplified update), where it will know to pull grad[idx] from Worker A? And then "publishes" the new "beta future" on the "params" channel?
  5. This all happens async - so effectively a "slow" worker may "miss" a few beta updates. Workers could always poll the head of the channel for the latest.
  6. The PS could in this way implement some form of "bounded synchronous" updates.

To answer your specific questions:

  1. As I mention above, of course we'd prefer to have lowest cost communication for the above scenario - but I would expect a few ms overhead from network hops to be marginal in terms of overall cost. I would tend to start with what is "built in" and see if it works well, before trying to build more custom stuff.
  2. Quite a lot - that is the idea, to scale to large models. By large I would say typically 100s millions - billions of parameters in total. Each mini-batch would not typically communicate that entire parameter space, but it could still be a few million parameters per mini batch.
  3. Yes - though even 1 PS can be useful in scaling. Sharding can range from simply splitting the param vector in contiguous chunks, to "key chord" layouts and other more involved architectures (mostly this is done for fault tolerance purposes).

From me

So here is some code just to get things started off:

def parameter_server():
    beta = np.zeros(1000000)
    with worker_client() as c:
        betas = c.channel('betas', maxlen=1)
        future_beta = c.scatter(beta)
        betas.append(future_beta)

        updates = c.channel('updates')
        for update in updates:
            beta = modify(beta, update)
            future_beta = c.scatter(beta)
            betas.append(future_beta)


def worker(idx, x):
    with worker_client(separate_thread=False) as c:
        betas = c.channel('betas', maxlen=1)
        last_beta = betas.data[-1]
        subset_beta = c.submit(operator.getitem, last_beta, idx).result()
        params = subset_beta.result()

        update = create_update(x, params)
        updates = c.channel('updates')
        updates.append(update)
        updates.flush()

For what it's worth I expect this code to fail in some way. I think that channels will probably have to be slightly modified somehow. For example currently we're going to record all of the updates that have been sent to the updates channel. We need to have some way of stating that a reference is no longer needed. Channels need some mechanism to consume and destroy references to futures safely.

mrocklin avatar May 25 '17 08:05 mrocklin

Thanks for summarizing things. I ran into a first issue: TypeError: Don't know how to scatter <class 'numpy.ndarray'>.

I assume dask array already has serializers for numpy ndarray (and sparse too?). Is there an easy way to re-use them?

MLnick avatar May 25 '17 08:05 MLnick

Sorry, I was on master, which supports scattering singletons. Try scattering a list:

[future]  = client.scatter([x])

mrocklin avatar May 25 '17 08:05 mrocklin

Thanks - that solved the scatter. But with a simplified version of ps method:

def parameter_server():
    beta = np.zeros(D)
    with worker_client() as c:
        betas = c.channel('betas', maxlen=1)
        future_beta = c.scatter([beta])
        betas.append(future_beta)

I get:

...
Exception: TypeError("can't serialize <Future: status: finished, key: c85bbd0d1718128e8eb4b46d0b5940d8>",)

Full stacktrace here

MLnick avatar May 25 '17 09:05 MLnick

Your future_beta value is a list. Try unpacking

    [future_beta] = c.scatter([beta])

On Thu, May 25, 2017 at 10:28 AM, Nick Pentreath [email protected] wrote:

Thanks - that solved the scatter. But with a simplified version of ps method:

def parameter_server(): beta = np.zeros(D) with worker_client() as c: betas = c.channel('betas', maxlen=1) future_beta = c.scatter([beta]) betas.append(future_beta)

I get:

... Exception: TypeError("can't serialize <Future: status: finished, key: c85bbd0d1718128e8eb4b46d0b5940d8>",)

Full stacktrace here https://gist.github.com/MLnick/b42613104470501546e4e0209c58fa34

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/dask-glm/issues/57#issuecomment-303965534, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszB4v3eV96hk8-vXGnhfHW0WbMVx6ks5r9Um2gaJpZM4NmHsH .

mrocklin avatar May 25 '17 09:05 mrocklin

Ah thanks!

Another question - is there a way to control which workers dask arrays will be located on? i.e. Can I ensure that the dask array X is split among workers w1, w2 only?

MLnick avatar May 25 '17 09:05 MLnick

http://distributed.readthedocs.io/en/latest/locality.html

On Thu, May 25, 2017 at 10:40 AM, Nick Pentreath [email protected] wrote:

Ah thanks!

Another question - is there a way to control which workers dask arrays will be located on? i.e. Can I ensure that the dask array X is split among workers w1, w2 only?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/dask-glm/issues/57#issuecomment-303967869, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszCcrjDPaIOzzlDjZ5BFwlo7aj00Dks5r9Ux0gaJpZM4NmHsH .

mrocklin avatar May 25 '17 09:05 mrocklin

Thanks - it seems for constructing a dask array on a specified set of workers, it would have to be done via delayed with workers kwarg? Since array.persist(workers=[...]) seemed to throw an error: TypeError: unhashable type: 'Array'

MLnick avatar May 25 '17 10:05 MLnick

I'm having a weird issue with submitted tasks on workers not being able to see updates in the channel. Here's a simple repro:

In [3]: client = Client()

In [4]: def simple_worker():
   ...:     with worker_client() as c:
   ...:         betas = c.channel('betas')
   ...:         print(betas)
   ...:

In [5]: b = client.channel('betas')

In [6]: distributed.channels - INFO - Add new client to channel, ec0e59cc-4135-11e7-8f77-a45e60e5f579, betas
distributed.channels - INFO - Add new channel betas
In [6]:

In [6]: b.append("foo")

In [7]: b.flush()

In [8]: res = client.submit(simple_worker)

In [9]: <Channel: betas - 0 elements>
distributed.channels - INFO - Add new client to channel, f7644180-4135-11e7-95ee-a45e60e5f579, betas
distributed.batched - INFO - Batched Comm Closed:

MLnick avatar May 25 '17 10:05 MLnick

Hrm, try x = client.persist(x, workers={x: [...]})

The worker client may not get updates immediately after creating the channel. It could take a small while. Typically we've resolved this in the past by iterating over the channel like for future in channel: ... however in this case where we want to get the last element this seems less than ideal.

mrocklin avatar May 25 '17 11:05 mrocklin

Ok yeah - it seems to be a very small delay in being able to view the data. The "iter" works well.

This brings up a couple potentially useful things for a Channel:

  1. Be able to poll the head of the queue (or perhaps even a "slice"?) in a "blocking" manner (much like __iter__ seems to work)
  2. Be able to "clear" a Channel - during my experiments now I kept having to restart the whole lot because the channels got "full" of old updates.

MLnick avatar May 25 '17 14:05 MLnick

So, I have the very basics of working setup for the PS and a worker, using an ADAGRAD variant of SGD. I haven't properly "distributed" either the gradient computation (which should be done async, parallel, per chunk of X, y data) or the params on the PS. Also haven't done the "sparse" update version, I've just been working with full dense arrays so far.

But I did verify the Adagrad version gives results close to L-BFGS (so the logic is working).

Will post further updates as I go.

MLnick avatar May 25 '17 14:05 MLnick

Yes, so I think that rather than extending channels we might want to build a few new constructs including:

  1. Multi-producer multi-consumer queues
  2. Global singleton values

I think that these would solve your problems, would probably be useful in other contexts as well, and would probably not be that much work. This is the sort of task that might interest @jcrist if he's interested in getting into the distributed scheduler. All logic in both cases would be fully sequentialized through the scheduler, so this shouldn't require much in the way of concurrency logic (other than ensuring that state is always valid between tornado yield points). Copying (or improving upon) the channels implementation would probably be a good start. If @jcrist is busy or not very interested in this topic I can probably get to it starting Tuesday.

Hopefully this doesn't block @MLnick from making progress on ML work. Hopefully we'll be able to progress in parallel.

thoughts?

mrocklin avatar May 25 '17 21:05 mrocklin

Checking in here. @MLnick are you still able to make progress (with what time you have).

mrocklin avatar May 30 '17 18:05 mrocklin

Hey, meant to post an update but been a bit tied up.

I got the basics working. Here is the gist.

Current limitations / TODOs:

  1. This seems to be computing the worker chunks in serial - so this should be done async in parallel of course.
  2. Doesn't yet handle the "sparse" version, i.e. pulling only active feature indices. So this is just simple dense arrays to illustrate the PoC
  3. Only 1 PS

Having said that, it seems to work pretty well in principal and doing the "sparse pull" version should be straightforward to add.

Note the solution found by the SGD version is different from the LBFGS ones. For this small test case it is very close in terms of accuracy metrics but for larger size problems I think some things will need tweaking (e.g. item (1) above, step sizes, iterations and maybe early stopping criteria).

Expanding beyond 1 PS will be more involved since the beta data needs to be sharded across the PS nodes.

MLnick avatar May 31 '17 09:05 MLnick

Some feeback on the code:

  1. In worker() you might want to make the channels outside of the for loop
  2. This line blocks on each call to compute: res2 = [d.compute(workers=['w1', 'w2']) for d in res]. Instead you probably want futures = client.compute(res); client.gather(futures)
  3. Rather than use a number of iterations you might consider sending a stop signal from the client or another long-lived process using a channel with , channel.stop(). This would allow you to have an arbitrary stopping condition

mrocklin avatar May 31 '17 11:05 mrocklin

Expanding beyond 1 PS will be more involved since the beta data needs to be sharded across the PS nodes.

How is this typically done? Do the workers always send updates to both parameter servers?

mrocklin avatar May 31 '17 11:05 mrocklin

How is this typically done? Do the workers always send updates to both parameter servers?

It really depends on architecture.

Glint for example is done with Akka actors. It has a "masterless" architecture. There is a single "actor reference" representing the (client connection to) the array on the PS. It exposes a push(indices, values) and pull(indices) method. The PS Array actor takes care of sending the correct indices and values to each shard.

One could also have a "master" PS node (or coordinator node) that handles the sharding and splitting up and re-routing requests to the relevant PS.

It could get a bit involved depending on impl details.

MLnick avatar May 31 '17 13:05 MLnick

Another thought is perhaps there's a way to have the PS beta be a dask array (distributed across say 2 PS for example). Then the "reference" to that array is passed to workers. When they compute() on that they should automatically pull the data from where it is living on PS nodes, right?

What I'm not sure on is the update part - the Dask array on the PS must be updated and perhaps persisted (to force computation and the latest "view")? That's where I'm hazy. But it seems to me that the handling of the sharding / distributing of the params on the PS could be taken care of more automatically by dask arrays. On Wed, 31 May 2017 at 13:33, Matthew Rocklin [email protected] wrote:

Expanding beyond 1 PS will be more involved since the beta data needs to be sharded across the PS nodes.

How is this typically done? Do the workers always send updates to both parameter servers?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-glm/issues/57#issuecomment-305163835, or mute the thread https://github.com/notifications/unsubscribe-auth/AA_SB3BD2pS8fhGWv5Sajepk_zL7cs_mks5r_U__gaJpZM4NmHsH .

MLnick avatar May 31 '17 13:05 MLnick

Yeah, my brain went that way as well.

First, the simple way to do this is just to have a channel for each parameter server and to manage the slicing ourselves manually. This isn't automatic but also isn't that hard.

However, the broader question you bring up is a dask collection (bag, array, dataframe, delayed) that is backed by a changing set of futures. In principle this is the same as a channel, except that rather than pushing data into a deque we would push data into a random access data structure. I might play with this a bit and see how far I get in a short time.

I hope that this doesn't block you on near-term progress though. I suspect that we can go decently far on single-parameter server systems. Do you have a sense for what your current performance bottlenecks are?

mrocklin avatar May 31 '17 14:05 mrocklin

I played with and modified your script a bit here: https://gist.github.com/MLnick/27d71e2a809a54d82381428527e4f494

This starts multiple worker-tasks concurrently. It also evaluates the change in beta over time.

mrocklin avatar May 31 '17 16:05 mrocklin

Thanks, will take a look.

When I get some more time I will try to do the sparse param updates and then test things out on some larger (sparse) data (e.g. Criteo).

MLnick avatar May 31 '17 18:05 MLnick

Whoops, it looks like my last comment copied over MLnick's implementation rather than my altered one. Regardless, here is a new one using Queues and Variables.

https://gist.github.com/mrocklin/a92785743744b5c698984e16b7065037

Things look decent. Although at the moment the parameter server isn't able to keep up with the workers. We may want to either batch many updates at once or switch to asynchronous work to overlap communication latencies.

mrocklin avatar Jun 05 '17 16:06 mrocklin

This currently depends on https://github.com/dask/distributed/pull/1133

FWIW to me this approach feels much nicer than relying on channels as before. Some flaws:

As mentioned the parameter server can't keep up. I suspect that we want a get_many method on the queue to implement batching. As stated we may also want to enter into the full async API to help hide latencies. This is probably something we should do after exhausting the synchronous API though. I'd also like a good performance benchmark before going into this (which maybe someone else can help to provide?).

@MLnick if you're looking for a narrative for a blogpost, talk, etc.. then we might consider the progression of sequential computation on the parameter server, to batched, to asynchronous/batched. I'm personally curious to see the performance implications of these general choices to this problem. We have (or can easily construct) APIs for all of these fairly easily.

mrocklin avatar Jun 05 '17 16:06 mrocklin

@MLnick any objection to my including this as an example in a small blogpost?

mrocklin avatar Jun 12 '17 15:06 mrocklin

I'm happy to give you attribution for the work.

mrocklin avatar Jun 12 '17 15:06 mrocklin

Sounds great - I won't be able to help with it this week but otherwise happy to work on something or help review On Mon, 12 Jun 2017 at 17:15, Matthew Rocklin [email protected] wrote:

I'm happy to give you attribution for the work.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-glm/issues/57#issuecomment-307820582, or mute the thread https://github.com/notifications/unsubscribe-auth/AA_SB8f-JpDXoJlyOyHDm3VZ1sx5PnF1ks5sDVXzgaJpZM4NmHsH .

MLnick avatar Jun 12 '17 15:06 MLnick

progression of sequential computation on the parameter server, to batched, to asynchronous/batched

I'd be interested in seeing this comparison, especially as number of workers/communication channels increase.

I've looked through the PS in https://github.com/dask/dask-glm/issues/57#issuecomment-306228865. As far as I can tell, the main benefit behind this PS is with async communication: the above distributes communication to many worker-PS channels (which means it's not limited by bandwidth of one worker-PS channel). Correct?

asynchronous/batched

Can you expand what you mean by this? I am interested in async updates that do not have locks on beta (i.e., with distributed optimization algorithms Hogwild! and Cyclades). Can the async updates you're talking about be applied to these algorithms?

stsievert avatar Jul 21 '17 22:07 stsievert