keras icon indicating copy to clipboard operation
keras copied to clipboard

configurable `distribute_reduction_method` in Model.

Open kretes opened this issue 2 years ago • 30 comments

additionally allows to reduce with sum to account for the metrics/losses divided by global batch size - when training using custom training step and MirroredStrategy.

This addresses https://github.com/keras-team/keras/issues/15509

kretes avatar Jun 08 '22 12:06 kretes

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

google-cla[bot] avatar Jun 08 '22 12:06 google-cla[bot]

I ran all the unit tests in keras/engine, however I couldn't find a good way and an example for the test to write for this functionality. I would appreciate some guidance here.

kretes avatar Jun 08 '22 12:06 kretes

I ran all the unit tests in keras/engine, however I couldn't find a good way and an example for the test to write for this functionality. I would appreciate some guidance here.

Check out keras/engine/training_test.py, e.g. test_compile_fit_evaluate_predict_with_mirrored_strategy. You can write a similar kind of test, but for a model with a custom train_step, and with a correctness check.

fchollet avatar Jun 08 '22 22:06 fchollet

I've added the test now - it fails if one removes the model.distribute_reduction_method = "sum" configuration

kretes avatar Jun 10 '22 15:06 kretes

Hello @kretes, thanks for the PR. Model.fit() takes care of the loss scaling, and upon some investigation I believe this change (of allowing different reduction method) is not needed for the following reason:

  1. With MirroredStrategy, the output from the strategy.run(run_step) is no longer a distributed value (and therefore it takes this path) because Keras uses "sum aggregation" with "on-read synchronization" for metric weights. That is, as soon as you take the division of total and count, the two variables themselves synchronize across the replicas, so the result of the division is always a simply eager tensor.

  2. With TPUStrategy, TPUDistributedVariable manages to synchronize the values across the replicas before Keras sees the result, so there do exist multiple values (representing the values across the replicas) but they contain the same values. It's taking the reduction='first' path which is fine because taking anyone is the same result. "sum" reduction method won't make sense because the loss becomes proportional to the number of replicas. The code path prevents the MirroredStrategy/PSS case from getting affected regardless of 'first' or 'sum', but it does affect TPUStrategy in which case using 'sum' reduction method causes incorrect results.

rchao avatar Jun 15 '22 19:06 rchao

Hi @rchao, thanks for the review.

  • With MirroredStrategy, the output from the strategy.run(run_step) is no longer a distributed value (and therefore it takes this path) [...]

Investigating strategy.run(run_step) output in a test case I wrote (invoked in make_train_function and make_test_function) - by simply adding a print statement with the results shows me this:

{'loss': PerReplica:{
  0: <tf.Tensor 'truediv:0' shape=() dtype=float32>,
  1: <tf.Tensor 'replica_1/truediv:0' shape=() dtype=float32>
}}

and so it isn't catched by https://github.com/keras-team/keras/blob/b88f655b5fb81d157e1489cf4f3eff577a81203f/keras/engine/training.py#L3684-L3685 and the code proceeds to do a reduction.

It can be as well confirmed that changing the setup in the test - by commenting the line https://github.com/keras-team/keras/blob/ab78a0dd1fe955f88ef75c912e6bf183def07eda/keras/engine/training_test.py#L184 - makes it fail - therefore the reduction method has the effect on the returned metrics.

We observe same when running a real training here - without the change - the metrics we see are small by a factor of no. of replicas.

[...] because Keras uses "sum aggregation" with "on-read synchronization" for metric weights. That is, as soon as you take the division of total and count, the two variables themselves synchronize across the replicas, so the result of the division is always a simply eager tensor.

I don't know Keras that well to say how this affects this use case. To be sure - I am not using any metric weights here.

  • With TPUStrategy, TPUDistributedVariable manages to synchronize the values across the replicas before Keras sees the result, so there do exist multiple values (representing the values across the replicas) but they contain the same values. It's taking the reduction='first' path which is fine because taking anyone is the same result. "sum" reduction method won't make sense because the loss becomes proportional to the number of replicas. The code path prevents the MirroredStrategy/PSS case from getting affected regardless of 'first' or 'sum', but it does affect TPUStrategy in which case using 'sum' reduction method causes incorrect results.

I am not using this path, so this might be harder to nail. Is there any way to simulate TPUStrategy in tests?


The whole story started by following the docs in https://www.tensorflow.org/guide/distributed_training#use_tfdistributestrategy_with_custom_training_loops: Especially the fragment

You used [tf.nn.compute_average_loss](https://www.tensorflow.org/api_docs/python/tf/nn/compute_average_loss) to compute the loss. [tf.nn.compute_average_loss](https://www.tensorflow.org/api_docs/python/tf/nn/compute_average_loss) sums the per example loss and divides the sum by the global_batch_size. This is important because later after the gradients are calculated on each replica, they are aggregated across the replicas by summing them.

Here we are talking about the gradients that are aggregated by summing - That's why I assumed the same should be done to metrics, and this is what we talked about in https://github.com/keras-team/keras/issues/15509 and it seemed a good direction - see https://github.com/keras-team/keras/issues/15509#issuecomment-948794332

kretes avatar Jun 17 '22 07:06 kretes

Hello @kretes, thanks for more information. With your testing code, can you check a couple things:

  1. do the returned replica values from strategy.run() have the same values, or they differ?
  2. if you increase the number of replicas, do the loss and metrics value observed from Model.fit change? They are supposed to be the same if the layers have the same initialized weights.

rchao avatar Jun 22 '22 17:06 rchao

  • do the returned replica values from strategy.run() have the same values, or they differ?

  • if you increase the number of replicas, do the loss and metrics value observed from Model.fit change? They are supposed to be the same if the layers have the same initialized weights.

When I increase the number of replicas I get different values. E.g. for a batch_size=10 and 4 devices I get [0.3 0.3 0.3 0.1] - which sums to 1 and looks like a division of 10 examples into 3,3,3,1. When there are 2 devices the result is [0.5, 0.5]

Note: In test loss/metric values are stubbed disregarding the network/data, the only thing that counts is the number of examples in device-batch and in global batch size.

I've committed a version that prints those values during tests now - see https://github.com/keras-team/keras/pull/16664/commits/c64345e2bea55853f0ac15e1fcc726e1c2a42376.

I then run bazel test --test_output=all keras/engine:training_test --test_filter=test_distribution_reduction_method to get the output

kretes avatar Jun 30 '22 13:06 kretes

Hi @kretes Can you please resolve conflicts? Thank you!

gbaned avatar Jul 04 '22 16:07 gbaned

@gbaned yes, I've merged master and resolved conflicts.

kretes avatar Jul 06 '22 13:07 kretes

@ketes, would you please run sh shell/format.sh to format your code? It is failing the code style check in the CI.

Thanks.

haifeng-jin avatar Jul 06 '22 20:07 haifeng-jin

@haifeng-jin code reformatted

kretes avatar Jul 07 '22 07:07 kretes

This branch cannot be rebased due to conflicts

Please rebase. If you run into you can just create a new PR from a new fork.

fchollet avatar Jul 08 '22 19:07 fchollet

Hi @kretes Can you please check @fchollet's comments and keep us posted ? Thank you!

gbaned avatar Jul 14 '22 15:07 gbaned

@gbaned it's rebased now on top of master

kretes avatar Jul 18 '22 08:07 kretes

@rchao Were all of your comments addressed?

haifeng-jin avatar Jul 18 '22 17:07 haifeng-jin

Thanks @kretes! I took a closer look at the test cases and it appears to me that "sum" reduction method is needed here because of the custom train_step, whereas with the default train_step and gradient aggregation, we don't expect the "sum" method to be necessary.

I think this PR is fine as long as we make sure it's clear that default training step users shouldn't need to adjust the method via the setter. Can you supplement such information (by default, tf.distribute takes care of synchronization so that "first" is sufficient) in distribute_reduction_method's doc?

rchao avatar Jul 18 '22 20:07 rchao

@rchao @kretes may I suggest auto to be the default value, and the following decision flow:

class Reduction:
  AUTO = 'auto'
  FIRST = 'first'
  SUM = 'sum'
  CONCAT = 'concat'

def reduce_per_replica(values, strategy, reduction=Reduction.AUTO):
  if reduction == Reduction.AUTO:
    reduction = Reduction.FIRST if _is_tpu_multi_host(strategy) else Reduction.SUM

  def _reduce(v):
    ...

This form won't break TPU code and will work for both default and custom train_step without any user intervention.

It's very difficult to do research in Keras and I believe part of this is because we are always on custom environments (custom train_step, multiple GPUs) that require an understanding and care over a million tweaks. This would be one less thing to worry about, and thus helpful to researchers.

lucasdavid avatar Jul 19 '22 18:07 lucasdavid

@rchao yes, that's right indeed, it is just needed when a custom train_step is used. I've added a relevant note in the docstring in https://github.com/keras-team/keras/pull/16664/commits/5be6742aedc47c23555b642b599f50323b85c61e

At the same time it might be worth pointing users to this option by adding a note in https://www.tensorflow.org/guide/distributed_training#use_tfdistributestrategy_with_custom_training_loops . I will extend the documentation there and open a PR in tf for that.

kretes avatar Jul 26 '22 07:07 kretes

@lucasdavid I understand the idea of the default being an auto value. At the same time the suggested implementation would be something different than the issue I am trying to solve.

So far for TPU usage the reduction was always first in train and test step. What do you want to achieve now?

Your suggested code snippet changes the reduction to sum whenever user didn't configure it anyhow and when _is_tpu_multi_host(strategy) is False. Why should it be so?

kretes avatar Jul 26 '22 07:07 kretes

@kretes from what I gathered from @rchao's comment,

The code path prevents the MirroredStrategy/PSS case from getting affected regardless of 'first' or 'sum', but it does affect TPUStrategy in which case using 'sum' reduction method causes incorrect results.

sum will work for both default and custom training loops, with the former containing a single already-reduced value and the latter containing multiple values that should be summed. Meanwhile, first is the only valid option for TPU training.

My understanding is correct? If so, my suggestion to add the auto (default) value describes this behavior, exactly. Users with custom training loops will no longer need to manually set distribute_reduction_method and will still have the correct loss values, while users with default training loops and TPU training won't be affected. Hopefully it makes more sense now.

lucasdavid avatar Jul 26 '22 13:07 lucasdavid

@lucasdavid Thanks for the explanation. I get it now. Current version in this branch didn't change the default, and so for TPU it stays 'first'. However I've updated the docstrings (https://github.com/keras-team/keras/pull/16664/commits/88859985f40a9cfb91762d7cfb76c2fc59565f63) to be explicit about it.

I see how setting an 'auto' mode would be great. I miss the component on how to detect if the training is done using default model.fit or via custom training loop, and I understand we should be using first in the former and sum in the latter.

At the same time - I don't know if this would be a consistent approach. Passing to @rchao for opinion.

kretes avatar Jul 26 '22 15:07 kretes

The auto approach makes sense to me, but it seems to me that 'first' should be used regardless for auto by default in order to not have behavior changes? Thanks for the discussions.

rchao avatar Jul 26 '22 17:07 rchao

Unless I'm missing something, sum can be used for MirroredStrategy and will not imply in behavioral changes:

  1. MirroredStrategy + default training loop: sum works, because first([l1]) == sum([l1]) == l1 == l_total
  2. MirroredStrategy + custom training loop: sum is the correct choice here sum([l1, l2, ..., l_batch]) == l_total
  3. TPUStrategy + default training loop: first is assumed
  4. TPUStrategy + custom training loop: first is assumed
  5. MultiWorkerMirroredStrategy: this one I'm not sure. Is there a way to simulate this in local?

I wrote the changes in this commit and a new testcase (test_distribution_reduction_method_sum_default_train_step) alongside kretes's, demonstrating that it works as expected for MirroredStrategy + default train loop. I also ran all tests in keras/backend. They all pass.

lucasdavid avatar Jul 26 '22 19:07 lucasdavid

Thanks @lucasdavid ! I've pushed your commit to the branch in this PR and updated docstrings + texts on top of it. I guess this is ready for a review, @rchao

kretes avatar Jul 27 '22 09:07 kretes

haha thanks for the enthusiasm. I wasn't expecting you to pull the commit, it was more just to show that it works and that the tests pass. In any case, I think it's worth discussing a little more the auto option, specially for MultiWorkerMirroredStrategy and other strategies that I might not know of. I also did not find any correctness tests for TPUStrategy with custom training loops. So maybe these paths are uncovered?

lucasdavid avatar Jul 27 '22 17:07 lucasdavid

I wasn't expecting you to pull the commit, it was more just to show that it works and that the tests pass.

I reviewed it, it is an implementation of the auto option as well, so I think it will be easiest to review the whole code at once.

I think it's worth discussing a little more the auto option, specially for MultiWorkerMirroredStrategy and other strategies that I might not know of. I also did not find any correctness tests for TPUStrategy with custom training loops. So maybe these paths are uncovered?

I'm not familiar with this as well, passing that to @rchao

kretes avatar Jul 29 '22 09:07 kretes

Thanks! I've added a few quick comments.

rchao avatar Jul 29 '22 18:07 rchao

@kretes Can you please check @rchao's comments and keep us posted ? Thank you!

gbaned avatar Aug 03 '22 15:08 gbaned

@kretes I wrote tests for MultiWorkerMirroredStrategy in this commit. They are being skipped in my laptop though. I think I don't have the appropriate hardware for it.

lucasdavid avatar Aug 09 '22 19:08 lucasdavid

It's my understanding we could still do some work on this (update the docs as requested by @rchao and maybe add some tests for the multi worker case), and that it shouldn't have been merged just yet.

lucasdavid avatar Aug 15 '22 12:08 lucasdavid

Ok, I can see the PR got merged. While this isn't a huge problem - I think we should now create a new PR with update to the doc requested in https://github.com/keras-team/keras/pull/16664#discussion_r940609739 and the test from https://github.com/keras-team/keras/pull/16664#issuecomment-1209815851 . do you want to do it, @lucasdavid as you are the author of the test?

kretes avatar Aug 16 '22 13:08 kretes

Okay, I can try to do it.

lucasdavid avatar Aug 17 '22 22:08 lucasdavid