addons
addons copied to clipboard
Gradient accumulate optimizer
Describe the feature and the current behavior/state.
Hi, I think it's good if someone can support Gradient Accumulate optimizer for this repo, this feature is really helpful for those who train the large model with a low resource such as Bert, etc. The usage should be similar with tfa.optimizer.SWA
:
opt = ...
accumulate_opt = tfa.optimizer.AccumulationOptimizer(opt, accumulate_steps=5)
There is an implementation of gradient accumulator but for custom training loop
rather than Keras model fit here link.
Relevant information
- Are you willing to contribute it (yes/no): no
- Are you willing to maintain it going forward? (yes/no): no
- Is there a relevant academic paper? (if so, where):
- Is there already an implementation in another framework? (if so, where): here but for
custom training loop.
- Was it part of tf.contrib? (if so, where): no
Which API type would this fall under (layer, metric, optimizer, etc.) optimizer Who will benefit with this feature? all tensorflow users. Any other info.
@tomerk please bring this up in ecosystem review, though I don't expect any conflicts. For the future do you need us to tag anyone or is the ecosystem-review
label sufficient?
@seanpmorgan https://github.com/tensorflow/tensorflow/issues/32176
@tomerk @bhack @seanpmorgan any update :D.
@bhack just remind :D
Check https://github.com/tensorflow/addons/pull/2196#issuecomment-747098470
Notes from ecosystem review:
Rather than an optimizer that wraps another optimizer, we think this might actually make sense as an object to use as a gradient_transformer in optimizers (a new feature after an optimizer refactoring earlier this year):
https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/Optimizer
Looping in @omalleyt12 who might have more insight / suggestions on how to do this.
Depending on how it comes out it might make sense in core, but addons seems like a good initial spot.
@omalleyt12 can you take a look ?
@tomerk @bhack @omalleyt12 a gentle ping in case you missed my previous comment :D
We have a quite duplicated ticket at https://github.com/tensorflow/tensorflow/issues/32176
@bhack it's a long time ago, any plan for this feature for TF2 ?
It would be great to have such a param:
From a user perspective, having a parameter (e.g. in .fit) like gradient_accumulate_batch_frequency=1 that updates gradients every N batches would be perfect. (1 as default to update every batch, as it is at the moment)
IMO, this feature is not needed as we can implement gradient accumulation in the custom training in tf 2
. Link.
@innat we need it in case we want to use tf.keras model fit function :))).
That should also possible to achieve by overriding the train_step
method, customizing the .fit
function.
That should also possible to achieve by overriding the
train_step
method, customizing the.fit
function.
overriding train_step
can be considered as custom_training_loop
. We need plug and play
module and it should be apply for all model without custom train_step
.
If you want to plug and play, then try this. https://github.com/CyberZHG/keras-gradient-accumulation
Overriding the train_step
doesn't necessarily refer to a custom training loop, link. This is the leverage we get from new tf.keras
, it should be adopted. Being too much plug-and-play stuff can bring lots of breakdown with mid or high-level packages update.
@innat i know train_step
function in tf.keras. I also implement gradient accumulate
in my framework (https://github.com/TensorSpeech/TensorFlowTTS). I'm not talking about how to implement it, I'm talking about that we need this module in this repo (ez maintain, stable and adapt with new TF version.). I can implement GA in both train_step
and custom_training_loop
but if we have a GA wrapper module for all base optimizer it would be better :D .
@dathudeptrai understood. It sounds great then. However, I'm facing some issues with implementing GA by customizing the .fit
. In case you're interested, please have a look here https://github.com/tensorflow/tensorflow/issues/47578.
update
Solved: https://gist.github.com/innat/ba6740293e7b7b227829790686f2119c
The above gradient accumulation implementation doesn;t work with TF2.5 with multi GPU distribution strategy strategy = tf.distribute.MultiWorkerMirroredStrategy()
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import tensorflow as tf
from tensorflow_addons.utils import types
from typeguard import typechecked
class GradientAccumulator(tf.keras.optimizers.Optimizer):
"""Optimizer wrapper for gradient accumulation."""
@typechecked
def __init__(
self,
optimizer: types.Optimizer,
accum_steps: types.TensorLike = 4,
name: str = "GradientAccumulator",
**kwargs,
):
r"""Construct a new GradientAccumulator optimizer.
Args:
optimizer: str or `tf.keras.optimizers.Optimizer` that will be
used to compute and apply gradients.
accum_steps: int > 0. Update gradient in every accumulation steps.
name: Optional name for the operations created when applying
gradients. Defaults to "GradientAccumulator".
**kwargs: keyword arguments. Allowed to be {`clipnorm`,
`clipvalue`, `lr`, `decay`}. `clipnorm` is clip gradients by
norm; `clipvalue` is clip gradients by value, `decay` is
included for backward compatibility to allow time inverse
decay of learning rate. `lr` is included for backward
compatibility, recommended to use `learning_rate` instead.
"""
super().__init__(name, **kwargs)
self._optimizer = tf.keras.optimizers.get(optimizer)
self._gradients = []
self._accum_steps = accum_steps
def _create_slots(self, var_list):
self._optimizer._create_slots(var_list=var_list)
for var in var_list:
self.add_slot(var, "ga")
self._gradients = [self.get_slot(var, "ga") for var in var_list]
@property
def gradients(self):
"""The accumulated gradients on the current replica."""
if not self._gradients:
raise ValueError(
"The accumulator should be called first to initialize the gradients"
)
return list(
gradient.read_value() if gradient is not None else gradient
for gradient in self._gradients
)
def apply_gradients(self, grads_and_vars, name=None, **kwargs):
self._optimizer._iterations = self.iterations
return super().apply_gradients(grads_and_vars, name, **kwargs)
def _resource_apply_dense(self, grad, var, apply_state=None):
accum_gradient = self.get_slot(var, "ga")
if accum_gradient is not None and grad is not None:
accum_gradient.assign_add(
grad, use_locking=self._use_locking, read_value=False
)
def _apply():
if "apply_state" in self._optimizer._dense_apply_args:
train_op = self._optimizer._resource_apply_dense(
accum_gradient.read_value(), var, apply_state=apply_state
)
else:
train_op = self._optimizer._resource_apply_dense(
accum_gradient.read_value(), var
)
reset_op = accum_gradient.assign(
tf.zeros_like(accum_gradient),
use_locking=self._use_locking,
read_value=False,
)
return tf.group(train_op, reset_op)
apply_op = tf.cond(
(self.iterations+1) % self._accum_steps == 0, _apply, lambda: tf.no_op()
)
return apply_op
def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state):
accum_gradient = self.get_slot(var, "ga")
if accum_gradient is not None and grad is not None:
self._resource_scatter_add(accum_gradient, indices, grad)
def _apply():
if "apply_state" in self._optimizer._sparse_apply_args:
train_op = self._optimizer._resource_apply_sparse(
accum_gradient.sparse_read(indices),
var,
indices,
apply_state=apply_state,
)
else:
train_op = self._optimizer._resource_apply_sparse(
accum_gradient.sparse_read(indices), var, indices
)
reset_op = accum_gradient.assign(
tf.zeros_like(accum_gradient),
use_locking=self._use_locking,
read_value=False,
)
return tf.group(train_op, reset_op)
apply_op = tf.cond(
(self.iterations+1) % self._accum_steps == 0, _apply, lambda: tf.no_op()
)
return apply_op
def reset(self):
"""Resets the accumulated gradients on the current replica."""
assign_ops = []
if not self._gradients:
return assign_ops
for gradient in self._gradients:
if gradient is not None:
assign_ops.append(
gradient.assign(
tf.zeros_like(gradient),
use_locking=self._use_locking,
read_value=False,
)
)
return tf.group(assign_ops)
@property
def lr(self):
return self._optimizer._get_hyper("learning_rate")
@lr.setter
def lr(self, lr):
self._optimizer._set_hyper("learning_rate", lr) #
@property
def learning_rate(self):
return self._optimizer._get_hyper("learning_rate")
@learning_rate.setter
def learning_rate(self, learning_rate):
self._optimizer._set_hyper("learning_rate", learning_rate)
def get_config(self):
config = {"accum_steps": self._accum_steps}
base_config = super().get_config()
return {**base_config, **config}
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10)
])
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer=GradientAccumulator(tf.keras.optimizers.Adam(), accum_steps=4),
loss=loss_fn,
metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)
Here is my implementation
@fsx950223 Many thanks. If it's stable, could you make a pull request to support this feature for tensorflow_addon
?
@fsx950223 Many thanks. If it's stable, could you make a pull request to support this feature for
tensorflow_addon
?
I'm not sure, maybe you could test it.
import tensorflow as tf from tensorflow_addons.utils import types from typeguard import typechecked class GradientAccumulator(tf.keras.optimizers.Optimizer): """Gradient accumulation utility. When used with a distribution strategy, the accumulator should be called in a replica context. Gradients will be accumulated locally on each replica and without synchronization. Users should then call ``.gradients``, scale the gradients if required, and pass the result to ``apply_gradients``. """ @typechecked def __init__(self, optimizer: types.Optimizer, accum_steps: types.TensorLike = 4, name: str = 'gradient_accumulator', **kwargs): """Initializes the accumulator.""" super().__init__(name, **kwargs) self._optimizer = tf.keras.optimizers.get(optimizer) self._gradients = [] self._accum_steps = accum_steps def _create_slots(self, var_list): self._optimizer._create_slots(var_list=var_list) for var in var_list: self.add_slot(var, "ga") self._gradients = [self.get_slot(var, "ga") for var in var_list] @property def gradients(self): """The accumulated gradients on the current replica.""" if not self._gradients: raise ValueError( "The accumulator should be called first to initialize the gradients" ) return list( gradient.value() if gradient is not None else gradient for gradient in self._gradients ) def apply_gradients(self, grads_and_vars, name=None, **kwargs): self._optimizer._iterations = self.iterations return super().apply_gradients(grads_and_vars, name, **kwargs) def _resource_apply_dense(self, grad, var, apply_state=None): accum_gradient = self.get_slot(var, 'ga') if accum_gradient is not None and grad is not None: accum_gradient.assign_add(grad, use_locking=self._use_locking, read_value=False) def _apply(): if "apply_state" in self._optimizer._dense_apply_args: train_op = self._optimizer._resource_apply_dense( accum_gradient, var, apply_state=apply_state ) else: train_op = self._optimizer._resource_apply_dense(accum_gradient, var) reset_op = self.reset() return tf.group(train_op, reset_op) apply_op = tf.cond(self.iterations % self._accum_steps == 0, _apply, lambda: tf.no_op()) return apply_op def reset(self): """Resets the accumulated gradients on the current replica.""" assign_ops = [] if not self._gradients: return assign_ops for gradient in self._gradients: if gradient is not None: assign_ops.append(gradient.assign(tf.zeros_like(gradient), use_locking=self._use_locking, read_value=False)) return tf.group(assign_ops) @property def lr(self): return self._optimizer._get_hyper("learning_rate") @lr.setter def lr(self, lr): self._optimizer._set_hyper("learning_rate", lr) # @property def learning_rate(self): return self._optimizer._get_hyper("learning_rate") @learning_rate.setter def learning_rate(self, learning_rate): self._optimizer._set_hyper("learning_rate", learning_rate) def get_config(self): config = { "accum_steps": self.accum_steps } base_config = super().get_config() return {**base_config, **config} mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) predictions = model(x_train[:1]).numpy() # tf.nn.softmax(predictions).numpy() loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) model.compile(optimizer=GradientAccumulator(tf.keras.optimizers.Adam()), loss=loss_fn, metrics=['accuracy']) model.fit(x_train, y_train, epochs=5)
Here is my implementation
When running this code on TF2.5 keras (run_eagerly=False) with MultiWorkerMirroredStrategy on 8 GPUs the time to train is ~2 times slower than running w/o gradient accumulation ( using the Class when accum_step=1). Do you know what is the reason for this x2 slowdown , the time to train should have stayed at the same.
Could you provide tensorflow profiles?
I found the default setting is faster than accum_steps=1
on a single device.
You could test the code on my PR which I have fixed several bugs.
# Copyright 2021 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== import tensorflow as tf from tensorflow_addons.utils import types from typeguard import typechecked class GradientAccumulator(tf.keras.optimizers.Optimizer): """Optimizer wrapper for gradient accumulation.""" @typechecked def __init__( self, optimizer: types.Optimizer, accum_steps: types.TensorLike = 4, name: str = "GradientAccumulator", **kwargs, ): r"""Construct a new GradientAccumulator optimizer. Args: optimizer: str or `tf.keras.optimizers.Optimizer` that will be used to compute and apply gradients. accum_steps: int > 0. Update gradient in every accumulation steps. name: Optional name for the operations created when applying gradients. Defaults to "GradientAccumulator". **kwargs: keyword arguments. Allowed to be {`clipnorm`, `clipvalue`, `lr`, `decay`}. `clipnorm` is clip gradients by norm; `clipvalue` is clip gradients by value, `decay` is included for backward compatibility to allow time inverse decay of learning rate. `lr` is included for backward compatibility, recommended to use `learning_rate` instead. """ super().__init__(name, **kwargs) self._optimizer = tf.keras.optimizers.get(optimizer) self._gradients = [] self._accum_steps = accum_steps def _create_slots(self, var_list): self._optimizer._create_slots(var_list=var_list) for var in var_list: self.add_slot(var, "ga") self._gradients = [self.get_slot(var, "ga") for var in var_list] @property def gradients(self): """The accumulated gradients on the current replica.""" if not self._gradients: raise ValueError( "The accumulator should be called first to initialize the gradients" ) return list( gradient.read_value() if gradient is not None else gradient for gradient in self._gradients ) def apply_gradients(self, grads_and_vars, name=None, **kwargs): self._optimizer._iterations = self.iterations return super().apply_gradients(grads_and_vars, name, **kwargs) def _resource_apply_dense(self, grad, var, apply_state=None): accum_gradient = self.get_slot(var, "ga") if accum_gradient is not None and grad is not None: accum_gradient.assign_add( grad, use_locking=self._use_locking, read_value=False ) def _apply(): if "apply_state" in self._optimizer._dense_apply_args: train_op = self._optimizer._resource_apply_dense( accum_gradient.read_value(), var, apply_state=apply_state ) else: train_op = self._optimizer._resource_apply_dense( accum_gradient.read_value(), var ) reset_op = accum_gradient.assign( tf.zeros_like(accum_gradient), use_locking=self._use_locking, read_value=False, ) return tf.group(train_op, reset_op) apply_op = tf.cond( (self.iterations+1) % self._accum_steps == 0, _apply, lambda: tf.no_op() ) return apply_op def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): accum_gradient = self.get_slot(var, "ga") if accum_gradient is not None and grad is not None: self._resource_scatter_add(accum_gradient, indices, grad) def _apply(): if "apply_state" in self._optimizer._sparse_apply_args: train_op = self._optimizer._resource_apply_sparse( accum_gradient.sparse_read(indices), var, indices, apply_state=apply_state, ) else: train_op = self._optimizer._resource_apply_sparse( accum_gradient.sparse_read(indices), var, indices ) reset_op = accum_gradient.assign( tf.zeros_like(accum_gradient), use_locking=self._use_locking, read_value=False, ) return tf.group(train_op, reset_op) apply_op = tf.cond( (self.iterations+1) % self._accum_steps == 0, _apply, lambda: tf.no_op() ) return apply_op def reset(self): """Resets the accumulated gradients on the current replica.""" assign_ops = [] if not self._gradients: return assign_ops for gradient in self._gradients: if gradient is not None: assign_ops.append( gradient.assign( tf.zeros_like(gradient), use_locking=self._use_locking, read_value=False, ) ) return tf.group(assign_ops) @property def lr(self): return self._optimizer._get_hyper("learning_rate") @lr.setter def lr(self, lr): self._optimizer._set_hyper("learning_rate", lr) # @property def learning_rate(self): return self._optimizer._get_hyper("learning_rate") @learning_rate.setter def learning_rate(self, learning_rate): self._optimizer._set_hyper("learning_rate", learning_rate) def get_config(self): config = {"accum_steps": self._accum_steps} base_config = super().get_config() return {**base_config, **config} mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) model.compile(optimizer=GradientAccumulator(tf.keras.optimizers.Adam(), accum_steps=4), loss=loss_fn, metrics=['accuracy']) model.fit(x_train, y_train, epochs=5)
Here is my implementation
This seems to work just fine for training, but I have some strange behaviour when loading the trained model using tf.keras.models.load_model with compile=True.
After training, I just load the model like so:
model = tf.keras.models.load_model(
model_path,
custom_objects={
"GradientAccumulator": GradientAccumulator(optimizer=Adam(ret.learning_rate), accum_steps=ret.accumsteps),
},
compile=True,
)
This results in:
TypeError: missing a required argument: 'optimizer'
Probably something silly I am missing here? Any ideas, @fsx950223? I just trained a simple CNN classifier, nothing fancy. Works fine if regular Adam is used. Tested using TF 2.8 and Python 3.8.10.
I managed to get it working by doing the following modifications.
Seems like the optimizer argument is missing in the get_config. I rewrote the config variable to:
config = {"optimizer": self.optimizer, "accum_steps": self._accum_steps}
and added self.optimizer = optimizer
to __init__()
.
Then when feeding the Adam optimizer, I got that:
TypeError: type of argument "optimizer" must be one of (keras.optimizer_v2.optimizer_v2.OptimizerV2, str); got dict instead
I believe that is because of the type hint optimizer: types.Optimizer
in __init__()
. If you wish to send the Adam optimizer (for instance) as input, it will fail as it will be stored as a Dict. However, that is the expected format, as you do self._optimizer=tf.keras.optimizers.get(optimizer)
in the __init__()
, where the get method expects either a string or Dict. Hence, I removed the type hint for the optimizer to just get optimizer
. Might be a more preferable type hint, but I am not aware of any.
Lastly, in my example, the custom object should be custom_objects={"GradientAccumulator": GradientAccumulator}
.
I could post the final, clean version, if of interest. If you are planning on merging this into TF-addons, I could contribute to the PR.
Hi @andreped , I just saw your implementation here (https://github.com/andreped/GradientAccumulator). Great work! and I hope it will work fine with multi-gpu strategy :D .