keras
keras copied to clipboard
More Tests for customizable reduction strategy in model
#16664 was automatically merged, but we were still working on it.
This PR only fixes the docs and adds a couple more sanity tests to cover reduction="auto" in the MultiWorkerMirroredStrategy case.
Not sure if they should be added at multi_worker_test or distribute_strategy_test, but I can move it if necessary.
Thank you for the PR.
@rchao, can you please take a look?
@lucasdavid Can you please check @rchao's comments and keep us posted ? Thank you!
@rchao done. Sorry for the delay. Difficult month.
@rchao done
Fantastic. Thank you very much.
Here are the internal errors, @lucasdavid can you please verify ? Thank you!
Traceback (most recent call last): File "/py/absl/testing/parameterized.py", line 318, in bound_param_test return test_method(self, **testcase_params) File "/tensorflow/python/framework/test_combinations.py", line 360, in decorated execute_test_method() File "/tensorflow/python/framework/test_combinations.py", line 343, in execute_test_method test_method(**kwargs_to_pass) File "/tensorflow/python/distribute/combinations.py", line 601, in decorator self.fail(result.message) AssertionError: Traceback (most recent call last): File "/py/absl/testing/parameterized.py", line 318, in bound_param_test return test_method(self, **testcase_params) File "/tensorflow/python/framework/test_combinations.py", line 360, in decorated execute_test_method() File "/tensorflow/python/framework/test_combinations.py", line 343, in execute_test_method test_method(**kwargs_to_pass) File "/tensorflow/python/distribute/combinations.py", line 559, in decorator test_method(self, **kwargs) File "/py/keras/distribute/multi_worker_test.py", line 327, in test_distribution_reduction_method_auto_custom_train_step x = keras.layers.Flatten(inputs) File "/py/keras/layers/reshaping/flatten.py", line 65, in init self.data_format = conv_utils.normalize_data_format(data_format) File "/py/keras/utils/conv_utils.py", line 223, in normalize_data_format data_format = value.lower() AttributeError: 'KerasTensor' object has no attribute 'lower'
@gbaned I believe I fixed the issue.
It failed again. I apologize for that. I'm doing my best, but its difficult to see what's wrong without being able to test this locally or see the logs. I'd appreciate if you could once again paste the logs here.
It failed again. I apologize for that. I'm doing my best, but its difficult to see what's wrong without being able to test this locally or see the logs. I'd appreciate if you could once again paste the logs here.
Just replied with a comment. Thank you for your patience!
@rchao I think I figured it out. There are two problems. Firstly, function multi_worker_testing_utils.mnist_synthetic_dataset had a glitch which ignored the parameter batch_size and set batch=64 for all cases. (Other test cases that use this function work because they call it with batch_size=64.)
Secondly, multi-worker is indeed being synchronized before reduce_per_replica is called (for both custom and default training loops). Evidence:
if not _is_per_replica_instance(v):
print('*' * 80)
print(f'_reduce not per-replica instance v={v}')
return v
[chief-0]: _reduce not per-replica instance v=Tensor("truediv:0", shape=(), dtype=float32, device=/job:chief/replica:0/task:0/device:CPU:0)
At the same time, let global_batch_size=32, steps=2 and epochs=2. In these conditions, using tf.nn.compute_average_loss(loss_value, global_batch_size=global_batch_size) seems to result in the expected loss value being divided by 2 (the number of workers):
AssertionError:
Not equal to tolerance rtol=1e-06, atol=1e-06
Mismatched value: a is different from b.
...
x: array([1.5, 1.5])
y: array([3., 3.])
If I replace tf.nn.compute_average_loss by tf.reduce_sum, I would expect it to return batch*loss=32*3=96.
Instead, it is returning 16*3=32/workers * 3 = 48:
AssertionError:
Not equal to tolerance rtol=1e-06, atol=1e-06
Mismatched value: a is different from b.
...
x: array([48., 48.])
y: array([3., 3.])
Lastly, tf.reduce_mean(loss_value) fixes the test case for the multi-worker strategy, but it wouldn't result in the correct loss value for mirror strategy (as demonstrated by keras.engine.training_test:test_distribution_reduction_method_sum_default_train_step).
Thank you for the info! I see that the tests are passing now. I'll review shortly and get back to you as soon as I can.
I might have not expressed myself clearly: I figured out what the error was (I believe MultiWorkerMirroredStrategy#run is returning the expected loss value divided by num_workers/two), but I am unsure how to fix it as this code is not part of keras, but tensorflow.distribute.
We can fix the testcase by changing the loss function to num_workers * nn.compute_loss(..), but that would not be aligned with what is taught in tensorflow/keras' tutorials.
I believe something could be wrong with how reduction is being handed by MWMS.
The failing tests cases are ran and shown as import/copybara, so they didn't run yet. I haven't fixed them, as I am waiting your thoughts on the matter.
Is it possible that MWMS is doing reduction ReductionOp.MEAN? (distribute/distribute_lib.py#L1449) That would explain why we are getting loss/2.
I might have not expressed myself clearly: I figured out what the error was (I believe MultiWorkerMirroredStrategy#run is returning the expected loss value divided by num_workers/two), but I am unsure how to fix it as this code is not part of keras, but tensorflow.distribute.
I see. The loss of the result of Model.fit() should not change with the strategy being used. Are you suspecting that there needs to be a fix on tf.distribute side? Is so, we'll need that before we can proceed with a change here in Keras.
The
lossof the result ofModel.fit()should not change with the strategy being used.
Yes, that's the idea behind my proposed testcase: if all samples have associated loss of 3.0, then the result should be 3.0 regardless of any other component in the model.
Are you suspecting that there needs to be a fix on tf.distribute side?
Yes. Based on the numbers I'm getting, I **believe** MWMS is either discarding half the values or performing ReductionOp.MEAN here, with the former being incorrect and the second being incompatible with tf.nn.compute_average_loss (which expects ReductionOp.SUM).
The
lossof the result ofModel.fit()should not change with the strategy being used.Yes, that's the idea behind my proposed testcase: if all samples have associated loss of 3.0, then the result should be 3.0 regardless of any other component in the model.
Are you suspecting that there needs to be a fix on tf.distribute side?
Yes. Based on the numbers I'm getting, I believe MWMS is either discarding half the values or performing
ReductionOp.MEANhere, with the former being incorrect and the second being incompatible withtf.nn.compute_average_loss(which expects ReductionOp.SUM).
Thanks Lucas! I think opening a PR on tf.distribute is a good next step and we can go from there.
@rchao I fixed it. The problem was that experimental_local_results was discarding the results from the second worker.
More detailed:
I mentioned above that results were already gathered when reduce_per_replica was called, but this is clearly wrong. (And it only becomes obvious with model.compile(..., run_eagerly=True).)
I changed the input dataset, making the target values assume the values [0, 1, 2, 3] instead of [1, 1, ..., 1].
The resulting loss was 0.25 = (0 + 1) / global_batch_size. Hence the values of the second worker are clearly being discarded. There is a note about this on experimental_local_results's docs:
Note: This only returns values on the worker initiated by this client. When using a tf.distribute.Strategy like tf.distribute.experimental.MultiWorkerMirroredStrategy, each worker will be its own client, and this function will only return values computed on that worker.
How I fixed it:
I used strategy.reduce("SUM", v, axis=None) when reduction was sum and the env. was the MWMS.
Is that acceptable? Let me know what you think.
@rchao I fixed it. The problem was that experimental_local_results was discarding the results from the second worker.
More detailed:
I mentioned above that results were already gathered when reduce_per_replica was called, but this is clearly wrong. (And it only becomes obvious with
model.compile(..., run_eagerly=True).) I changed the input dataset, making the target values assume the values[0, 1, 2, 3]instead of[1, 1, ..., 1]. The resulting loss was0.25 = (0 + 1) / global_batch_size. Hence the values of the second worker are clearly being discarded. There is a note about this on experimental_local_results's docs:Note: This only returns values on the worker initiated by this client. When using a tf.distribute.Strategy like tf.distribute.experimental.MultiWorkerMirroredStrategy, each worker will be its own client, and this function will only return values computed on that worker.
How I fixed it: I used
strategy.reduce("SUM", v, axis=None)when reduction was sum and the env. was the MWMS. Is that acceptable? Let me know what you think.
Thanks for the investigation! This makes sense to me.
@rchao done.
While we are here, I would like to make two suggestions regarding the reduction property. Let me know if any of them interest you and I will refactor the code.
- Rename
distribute_reduction_methodtodistribute_reduction, which is shorter and closer toLoss.reductionwithout adding any ambiguity:model = MyModel(inputs=inputs, outputs=x) model.distribute_reduction = 'auto' model.compile( optimizer="sgd", run_eagerly=False, ) - Add it as a parameter in the compile method, given its code proximity with the
run_eagerlyproperty (which is also a param in compile) and in order to make model build easier (from yaml/json configuration files, for example):model = MyModel(inputs=inputs, outputs=x) model. model.compile( optimizer="sgd", run_eagerly=False, distribute_reduction='auto', )
Thanks for the suggestions @lucasdavid! I think both are making sense, the only potential concern being that Model.compile() already has a long list of arguments. That said, I don't think we should block this PR on the additional discussions, so I would open a new PR to continue the discussions there and see people's opinions. What do you think?
Sounds good. :-) Sorry I pushed the wrong button. I think you need to approve it again. :grimacing: