keras icon indicating copy to clipboard operation
keras copied to clipboard

More Tests for customizable reduction strategy in model

Open lucasdavid opened this issue 3 years ago • 14 comments

#16664 was automatically merged, but we were still working on it.

This PR only fixes the docs and adds a couple more sanity tests to cover reduction="auto" in the MultiWorkerMirroredStrategy case. Not sure if they should be added at multi_worker_test or distribute_strategy_test, but I can move it if necessary.

lucasdavid avatar Aug 22 '22 16:08 lucasdavid

Thank you for the PR.

@rchao, can you please take a look?

fchollet avatar Aug 23 '22 17:08 fchollet

@lucasdavid Can you please check @rchao's comments and keep us posted ? Thank you!

gbaned avatar Aug 30 '22 14:08 gbaned

@rchao done. Sorry for the delay. Difficult month.

lucasdavid avatar Aug 30 '22 19:08 lucasdavid

@rchao done

lucasdavid avatar Aug 31 '22 03:08 lucasdavid

Fantastic. Thank you very much.

rchao avatar Aug 31 '22 15:08 rchao

I believe the test is not running... Can find it in the CPU or GPU logs.

lucasdavid avatar Aug 31 '22 16:08 lucasdavid

Here are the internal errors, @lucasdavid can you please verify ? Thank you!

Traceback (most recent call last): File "/py/absl/testing/parameterized.py", line 318, in bound_param_test return test_method(self, **testcase_params) File "/tensorflow/python/framework/test_combinations.py", line 360, in decorated execute_test_method() File "/tensorflow/python/framework/test_combinations.py", line 343, in execute_test_method test_method(**kwargs_to_pass) File "/tensorflow/python/distribute/combinations.py", line 601, in decorator self.fail(result.message) AssertionError: Traceback (most recent call last): File "/py/absl/testing/parameterized.py", line 318, in bound_param_test return test_method(self, **testcase_params) File "/tensorflow/python/framework/test_combinations.py", line 360, in decorated execute_test_method() File "/tensorflow/python/framework/test_combinations.py", line 343, in execute_test_method test_method(**kwargs_to_pass) File "/tensorflow/python/distribute/combinations.py", line 559, in decorator test_method(self, **kwargs) File "/py/keras/distribute/multi_worker_test.py", line 327, in test_distribution_reduction_method_auto_custom_train_step x = keras.layers.Flatten(inputs) File "/py/keras/layers/reshaping/flatten.py", line 65, in init self.data_format = conv_utils.normalize_data_format(data_format) File "/py/keras/utils/conv_utils.py", line 223, in normalize_data_format data_format = value.lower() AttributeError: 'KerasTensor' object has no attribute 'lower'

gbaned avatar Sep 05 '22 21:09 gbaned

@gbaned I believe I fixed the issue.

lucasdavid avatar Sep 07 '22 13:09 lucasdavid

It failed again. I apologize for that. I'm doing my best, but its difficult to see what's wrong without being able to test this locally or see the logs. I'd appreciate if you could once again paste the logs here.

lucasdavid avatar Sep 13 '22 14:09 lucasdavid

It failed again. I apologize for that. I'm doing my best, but its difficult to see what's wrong without being able to test this locally or see the logs. I'd appreciate if you could once again paste the logs here.

Just replied with a comment. Thank you for your patience!

rchao avatar Sep 13 '22 20:09 rchao

@rchao I think I figured it out. There are two problems. Firstly, function multi_worker_testing_utils.mnist_synthetic_dataset had a glitch which ignored the parameter batch_size and set batch=64 for all cases. (Other test cases that use this function work because they call it with batch_size=64.)

Secondly, multi-worker is indeed being synchronized before reduce_per_replica is called (for both custom and default training loops). Evidence:

if not _is_per_replica_instance(v):
    print('*' * 80)
    print(f'_reduce not per-replica instance v={v}')
    return v
[chief-0]:     _reduce not per-replica instance v=Tensor("truediv:0", shape=(), dtype=float32, device=/job:chief/replica:0/task:0/device:CPU:0)

At the same time, let global_batch_size=32, steps=2 and epochs=2. In these conditions, using tf.nn.compute_average_loss(loss_value, global_batch_size=global_batch_size) seems to result in the expected loss value being divided by 2 (the number of workers):

AssertionError: 
Not equal to tolerance rtol=1e-06, atol=1e-06
Mismatched value: a is different from b. 
...
 x: array([1.5, 1.5])
 y: array([3., 3.])

If I replace tf.nn.compute_average_loss by tf.reduce_sum, I would expect it to return batch*loss=32*3=96.
Instead, it is returning 16*3=32/workers * 3 = 48:

AssertionError:
Not equal to tolerance rtol=1e-06, atol=1e-06
Mismatched value: a is different from b. 
...
 x: array([48., 48.])
 y: array([3., 3.])

Lastly, tf.reduce_mean(loss_value) fixes the test case for the multi-worker strategy, but it wouldn't result in the correct loss value for mirror strategy (as demonstrated by keras.engine.training_test:test_distribution_reduction_method_sum_default_train_step).

lucasdavid avatar Sep 19 '22 00:09 lucasdavid

Thank you for the info! I see that the tests are passing now. I'll review shortly and get back to you as soon as I can.

rchao avatar Sep 20 '22 16:09 rchao

I might have not expressed myself clearly: I figured out what the error was (I believe MultiWorkerMirroredStrategy#run is returning the expected loss value divided by num_workers/two), but I am unsure how to fix it as this code is not part of keras, but tensorflow.distribute.

We can fix the testcase by changing the loss function to num_workers * nn.compute_loss(..), but that would not be aligned with what is taught in tensorflow/keras' tutorials. I believe something could be wrong with how reduction is being handed by MWMS.

The failing tests cases are ran and shown as import/copybara, so they didn't run yet. I haven't fixed them, as I am waiting your thoughts on the matter.

lucasdavid avatar Sep 20 '22 16:09 lucasdavid

Is it possible that MWMS is doing reduction ReductionOp.MEAN? (distribute/distribute_lib.py#L1449) That would explain why we are getting loss/2.

lucasdavid avatar Sep 20 '22 16:09 lucasdavid

I might have not expressed myself clearly: I figured out what the error was (I believe MultiWorkerMirroredStrategy#run is returning the expected loss value divided by num_workers/two), but I am unsure how to fix it as this code is not part of keras, but tensorflow.distribute.

I see. The loss of the result of Model.fit() should not change with the strategy being used. Are you suspecting that there needs to be a fix on tf.distribute side? Is so, we'll need that before we can proceed with a change here in Keras.

rchao avatar Sep 21 '22 19:09 rchao

The loss of the result of Model.fit() should not change with the strategy being used.

Yes, that's the idea behind my proposed testcase: if all samples have associated loss of 3.0, then the result should be 3.0 regardless of any other component in the model.

Are you suspecting that there needs to be a fix on tf.distribute side?

Yes. Based on the numbers I'm getting, I **believe** MWMS is either discarding half the values or performing ReductionOp.MEAN here, with the former being incorrect and the second being incompatible with tf.nn.compute_average_loss (which expects ReductionOp.SUM).

lucasdavid avatar Sep 21 '22 21:09 lucasdavid

The loss of the result of Model.fit() should not change with the strategy being used.

Yes, that's the idea behind my proposed testcase: if all samples have associated loss of 3.0, then the result should be 3.0 regardless of any other component in the model.

Are you suspecting that there needs to be a fix on tf.distribute side?

Yes. Based on the numbers I'm getting, I believe MWMS is either discarding half the values or performing ReductionOp.MEAN here, with the former being incorrect and the second being incompatible with tf.nn.compute_average_loss (which expects ReductionOp.SUM).

Thanks Lucas! I think opening a PR on tf.distribute is a good next step and we can go from there.

rchao avatar Sep 21 '22 23:09 rchao

@rchao I fixed it. The problem was that experimental_local_results was discarding the results from the second worker.

More detailed:

I mentioned above that results were already gathered when reduce_per_replica was called, but this is clearly wrong. (And it only becomes obvious with model.compile(..., run_eagerly=True).)
I changed the input dataset, making the target values assume the values [0, 1, 2, 3] instead of [1, 1, ..., 1]. The resulting loss was 0.25 = (0 + 1) / global_batch_size. Hence the values of the second worker are clearly being discarded. There is a note about this on experimental_local_results's docs:

Note: This only returns values on the worker initiated by this client. When using a tf.distribute.Strategy like tf.distribute.experimental.MultiWorkerMirroredStrategy, each worker will be its own client, and this function will only return values computed on that worker.

How I fixed it: I used strategy.reduce("SUM", v, axis=None) when reduction was sum and the env. was the MWMS. Is that acceptable? Let me know what you think.

lucasdavid avatar Sep 28 '22 00:09 lucasdavid

@rchao I fixed it. The problem was that experimental_local_results was discarding the results from the second worker.

More detailed:

I mentioned above that results were already gathered when reduce_per_replica was called, but this is clearly wrong. (And it only becomes obvious with model.compile(..., run_eagerly=True).) I changed the input dataset, making the target values assume the values [0, 1, 2, 3] instead of [1, 1, ..., 1]. The resulting loss was 0.25 = (0 + 1) / global_batch_size. Hence the values of the second worker are clearly being discarded. There is a note about this on experimental_local_results's docs:

Note: This only returns values on the worker initiated by this client. When using a tf.distribute.Strategy like tf.distribute.experimental.MultiWorkerMirroredStrategy, each worker will be its own client, and this function will only return values computed on that worker.

How I fixed it: I used strategy.reduce("SUM", v, axis=None) when reduction was sum and the env. was the MWMS. Is that acceptable? Let me know what you think.

Thanks for the investigation! This makes sense to me.

rchao avatar Sep 30 '22 03:09 rchao

@rchao done.

While we are here, I would like to make two suggestions regarding the reduction property. Let me know if any of them interest you and I will refactor the code.

  1. Rename distribute_reduction_method to distribute_reduction, which is shorter and closer to Loss.reduction without adding any ambiguity:
    model = MyModel(inputs=inputs, outputs=x)
    model.distribute_reduction = 'auto'
    model.compile(
        optimizer="sgd",
        run_eagerly=False,
    )
    
  2. Add it as a parameter in the compile method, given its code proximity with the run_eagerly property (which is also a param in compile) and in order to make model build easier (from yaml/json configuration files, for example):
    model = MyModel(inputs=inputs, outputs=x)
    model.
    model.compile(
        optimizer="sgd",
        run_eagerly=False,
        distribute_reduction='auto',
    )
    

lucasdavid avatar Sep 30 '22 14:09 lucasdavid

Thanks for the suggestions @lucasdavid! I think both are making sense, the only potential concern being that Model.compile() already has a long list of arguments. That said, I don't think we should block this PR on the additional discussions, so I would open a new PR to continue the discussions there and see people's opinions. What do you think?

rchao avatar Oct 01 '22 00:10 rchao

Sounds good. :-) Sorry I pushed the wrong button. I think you need to approve it again. :grimacing:

lucasdavid avatar Oct 01 '22 01:10 lucasdavid