addons icon indicating copy to clipboard operation
addons copied to clipboard

Gradient accumulate optimizer

Open dathudeptrai opened this issue 3 years ago • 43 comments

Describe the feature and the current behavior/state.

Hi, I think it's good if someone can support Gradient Accumulate optimizer for this repo, this feature is really helpful for those who train the large model with a low resource such as Bert, etc. The usage should be similar with tfa.optimizer.SWA:

opt = ...
accumulate_opt = tfa.optimizer.AccumulationOptimizer(opt, accumulate_steps=5)

There is an implementation of gradient accumulator but for custom training loop rather than Keras model fit here link.

Relevant information

  • Are you willing to contribute it (yes/no): no
  • Are you willing to maintain it going forward? (yes/no): no
  • Is there a relevant academic paper? (if so, where):
  • Is there already an implementation in another framework? (if so, where): here but for custom training loop.
  • Was it part of tf.contrib? (if so, where): no

Which API type would this fall under (layer, metric, optimizer, etc.) optimizer Who will benefit with this feature? all tensorflow users. Any other info.

dathudeptrai avatar Dec 01 '20 09:12 dathudeptrai

@tomerk please bring this up in ecosystem review, though I don't expect any conflicts. For the future do you need us to tag anyone or is the ecosystem-review label sufficient?

seanpmorgan avatar Dec 08 '20 03:12 seanpmorgan

@seanpmorgan https://github.com/tensorflow/tensorflow/issues/32176

bhack avatar Dec 08 '20 09:12 bhack

@tomerk @bhack @seanpmorgan any update :D.

dathudeptrai avatar Dec 10 '20 08:12 dathudeptrai

@bhack just remind :D

dathudeptrai avatar Dec 17 '20 09:12 dathudeptrai

Check https://github.com/tensorflow/addons/pull/2196#issuecomment-747098470

bhack avatar Dec 17 '20 09:12 bhack

Notes from ecosystem review:

Rather than an optimizer that wraps another optimizer, we think this might actually make sense as an object to use as a gradient_transformer in optimizers (a new feature after an optimizer refactoring earlier this year):

https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/Optimizer

Looping in @omalleyt12 who might have more insight / suggestions on how to do this.

Depending on how it comes out it might make sense in core, but addons seems like a good initial spot.

tomerk avatar Dec 17 '20 21:12 tomerk

@omalleyt12 can you take a look ?

dathudeptrai avatar Dec 21 '20 09:12 dathudeptrai

@tomerk @bhack @omalleyt12 a gentle ping in case you missed my previous comment :D

dathudeptrai avatar Dec 24 '20 10:12 dathudeptrai

We have a quite duplicated ticket at https://github.com/tensorflow/tensorflow/issues/32176

bhack avatar Dec 24 '20 10:12 bhack

@bhack it's a long time ago, any plan for this feature for TF2 ?

dathudeptrai avatar Dec 25 '20 03:12 dathudeptrai

It would be great to have such a param:

From a user perspective, having a parameter (e.g. in .fit) like gradient_accumulate_batch_frequency=1 that updates gradients every N batches would be perfect. (1 as default to update every batch, as it is at the moment)

chn-lee-yumi avatar Jan 15 '21 18:01 chn-lee-yumi

IMO, this feature is not needed as we can implement gradient accumulation in the custom training in tf 2. Link.

innat avatar Mar 04 '21 23:03 innat

@innat we need it in case we want to use tf.keras model fit function :))).

dathudeptrai avatar Mar 05 '21 02:03 dathudeptrai

That should also possible to achieve by overriding the train_step method, customizing the .fit function.

innat avatar Mar 05 '21 04:03 innat

That should also possible to achieve by overriding the train_step method, customizing the .fit function.

overriding train_step can be considered as custom_training_loop. We need plug and play module and it should be apply for all model without custom train_step.

dathudeptrai avatar Mar 05 '21 05:03 dathudeptrai

If you want to plug and play, then try this. https://github.com/CyberZHG/keras-gradient-accumulation

innat avatar Mar 05 '21 05:03 innat

Overriding the train_step doesn't necessarily refer to a custom training loop, link. This is the leverage we get from new tf.keras, it should be adopted. Being too much plug-and-play stuff can bring lots of breakdown with mid or high-level packages update.

innat avatar Mar 05 '21 05:03 innat

@innat i know train_step function in tf.keras. I also implement gradient accumulate in my framework (https://github.com/TensorSpeech/TensorFlowTTS). I'm not talking about how to implement it, I'm talking about that we need this module in this repo (ez maintain, stable and adapt with new TF version.). I can implement GA in both train_step and custom_training_loop but if we have a GA wrapper module for all base optimizer it would be better :D .

dathudeptrai avatar Mar 05 '21 05:03 dathudeptrai

@dathudeptrai understood. It sounds great then. However, I'm facing some issues with implementing GA by customizing the .fit. In case you're interested, please have a look here https://github.com/tensorflow/tensorflow/issues/47578.

update

Solved: https://gist.github.com/innat/ba6740293e7b7b227829790686f2119c

innat avatar Mar 05 '21 06:03 innat

The above gradient accumulation implementation doesn;t work with TF2.5 with multi GPU distribution strategy strategy = tf.distribute.MultiWorkerMirroredStrategy()

ajakoby avatar Jul 14 '21 12:07 ajakoby

# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import tensorflow as tf
from tensorflow_addons.utils import types
from typeguard import typechecked


class GradientAccumulator(tf.keras.optimizers.Optimizer):
    """Optimizer wrapper for gradient accumulation."""
    @typechecked
    def __init__(
        self,
        optimizer: types.Optimizer,
        accum_steps: types.TensorLike = 4,
        name: str = "GradientAccumulator",
        **kwargs,
    ):
        r"""Construct a new GradientAccumulator optimizer.

        Args:
            optimizer: str or `tf.keras.optimizers.Optimizer` that will be
                used to compute and apply gradients.
            accum_steps: int > 0. Update gradient in every accumulation steps.
            name: Optional name for the operations created when applying
                gradients. Defaults to "GradientAccumulator".
            **kwargs: keyword arguments. Allowed to be {`clipnorm`,
                `clipvalue`, `lr`, `decay`}. `clipnorm` is clip gradients by
                norm; `clipvalue` is clip gradients by value, `decay` is
                included for backward compatibility to allow time inverse
                decay of learning rate. `lr` is included for backward
                compatibility, recommended to use `learning_rate` instead.
        """
        super().__init__(name, **kwargs)
        self._optimizer = tf.keras.optimizers.get(optimizer)
        self._gradients = []
        self._accum_steps = accum_steps

    def _create_slots(self, var_list):
        self._optimizer._create_slots(var_list=var_list)
        for var in var_list:
            self.add_slot(var, "ga")

        self._gradients = [self.get_slot(var, "ga") for var in var_list]

    @property
    def gradients(self):
        """The accumulated gradients on the current replica."""
        if not self._gradients:
            raise ValueError(
                "The accumulator should be called first to initialize the gradients"
            )
        return list(
            gradient.read_value() if gradient is not None else gradient
            for gradient in self._gradients
        )

    def apply_gradients(self, grads_and_vars, name=None, **kwargs):
        self._optimizer._iterations = self.iterations
        return super().apply_gradients(grads_and_vars, name, **kwargs)

    def _resource_apply_dense(self, grad, var, apply_state=None):
        accum_gradient = self.get_slot(var, "ga")
        if accum_gradient is not None and grad is not None:
            accum_gradient.assign_add(
                grad, use_locking=self._use_locking, read_value=False
            )

        def _apply():
            if "apply_state" in self._optimizer._dense_apply_args:
                train_op = self._optimizer._resource_apply_dense(
                    accum_gradient.read_value(), var, apply_state=apply_state
                )
            else:
                train_op = self._optimizer._resource_apply_dense(
                    accum_gradient.read_value(), var
                )
            reset_op = accum_gradient.assign(
                        tf.zeros_like(accum_gradient),
                        use_locking=self._use_locking,
                        read_value=False,
                    )
            return tf.group(train_op, reset_op)

        apply_op = tf.cond(
            (self.iterations+1) % self._accum_steps == 0, _apply, lambda: tf.no_op()
        )
        return apply_op

    def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state):
        accum_gradient = self.get_slot(var, "ga")
        if accum_gradient is not None and grad is not None:
            self._resource_scatter_add(accum_gradient, indices, grad)

        def _apply():
            if "apply_state" in self._optimizer._sparse_apply_args:
                train_op = self._optimizer._resource_apply_sparse(
                    accum_gradient.sparse_read(indices),
                    var,
                    indices,
                    apply_state=apply_state,
                )
            else:
                train_op = self._optimizer._resource_apply_sparse(
                    accum_gradient.sparse_read(indices), var, indices
                )
            reset_op = accum_gradient.assign(
                        tf.zeros_like(accum_gradient),
                        use_locking=self._use_locking,
                        read_value=False,
                    )
            return tf.group(train_op, reset_op)

        apply_op = tf.cond(
            (self.iterations+1) % self._accum_steps == 0, _apply, lambda: tf.no_op()
        )
        return apply_op

    def reset(self):
        """Resets the accumulated gradients on the current replica."""
        assign_ops = []
        if not self._gradients:
            return assign_ops

        for gradient in self._gradients:
            if gradient is not None:
                assign_ops.append(
                    gradient.assign(
                        tf.zeros_like(gradient),
                        use_locking=self._use_locking,
                        read_value=False,
                    )
                )

        return tf.group(assign_ops)

    @property
    def lr(self):
        return self._optimizer._get_hyper("learning_rate")

    @lr.setter
    def lr(self, lr):
        self._optimizer._set_hyper("learning_rate", lr)  #

    @property
    def learning_rate(self):
        return self._optimizer._get_hyper("learning_rate")

    @learning_rate.setter
    def learning_rate(self, learning_rate):
        self._optimizer._set_hyper("learning_rate", learning_rate)

    def get_config(self):
        config = {"accum_steps": self._accum_steps}
        base_config = super().get_config()
        return {**base_config, **config}



mnist = tf.keras.datasets.mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10)
])

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer=GradientAccumulator(tf.keras.optimizers.Adam(), accum_steps=4),
              loss=loss_fn,
              metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)

Here is my implementation

fsx950223 avatar Jul 15 '21 02:07 fsx950223

@fsx950223 Many thanks. If it's stable, could you make a pull request to support this feature for tensorflow_addon ?

dathudeptrai avatar Jul 15 '21 02:07 dathudeptrai

@fsx950223 Many thanks. If it's stable, could you make a pull request to support this feature for tensorflow_addon ?

I'm not sure, maybe you could test it.

fsx950223 avatar Jul 15 '21 02:07 fsx950223

import tensorflow as tf
from tensorflow_addons.utils import types
from typeguard import typechecked

class GradientAccumulator(tf.keras.optimizers.Optimizer):
  """Gradient accumulation utility.
  When used with a distribution strategy, the accumulator should be called in a
  replica context. Gradients will be accumulated locally on each replica and
  without synchronization. Users should then call ``.gradients``, scale the
  gradients if required, and pass the result to ``apply_gradients``.
  """
  @typechecked
  def __init__(self, optimizer: types.Optimizer, accum_steps: types.TensorLike = 4, name: str = 'gradient_accumulator',
               **kwargs):
    """Initializes the accumulator."""
    super().__init__(name, **kwargs)
    self._optimizer = tf.keras.optimizers.get(optimizer)
    self._gradients = []
    self._accum_steps = accum_steps

  def _create_slots(self, var_list):
    self._optimizer._create_slots(var_list=var_list)
    for var in var_list:
      self.add_slot(var, "ga")

    self._gradients = [self.get_slot(var, "ga") for var in var_list]

  @property
  def gradients(self):
    """The accumulated gradients on the current replica."""
    if not self._gradients:
      raise ValueError(
        "The accumulator should be called first to initialize the gradients"
      )
    return list(
      gradient.value() if gradient is not None else gradient
      for gradient in self._gradients
    )

  def apply_gradients(self, grads_and_vars, name=None, **kwargs):
    self._optimizer._iterations = self.iterations
    return super().apply_gradients(grads_and_vars, name, **kwargs)

  def _resource_apply_dense(self, grad, var, apply_state=None):
    accum_gradient = self.get_slot(var, 'ga')
    if accum_gradient is not None and grad is not None:
      accum_gradient.assign_add(grad, use_locking=self._use_locking, read_value=False)
    def _apply():
      if "apply_state" in self._optimizer._dense_apply_args:
        train_op = self._optimizer._resource_apply_dense(
          accum_gradient, var, apply_state=apply_state
        )
      else:
        train_op = self._optimizer._resource_apply_dense(accum_gradient, var)
      reset_op = self.reset()
      return tf.group(train_op, reset_op)
    apply_op = tf.cond(self.iterations % self._accum_steps == 0, _apply,
                       lambda: tf.no_op())
    return apply_op

  def reset(self):
    """Resets the accumulated gradients on the current replica."""
    assign_ops = []
    if not self._gradients:
      return assign_ops

    for gradient in self._gradients:
      if gradient is not None:
        assign_ops.append(gradient.assign(tf.zeros_like(gradient), use_locking=self._use_locking, read_value=False))

    return tf.group(assign_ops)

  @property
  def lr(self):
    return self._optimizer._get_hyper("learning_rate")

  @lr.setter
  def lr(self, lr):
    self._optimizer._set_hyper("learning_rate", lr)  #

  @property
  def learning_rate(self):
    return self._optimizer._get_hyper("learning_rate")

  @learning_rate.setter
  def learning_rate(self, learning_rate):
    self._optimizer._set_hyper("learning_rate", learning_rate)

  def get_config(self):
    config = {
      "accum_steps": self.accum_steps
    }
    base_config = super().get_config()
    return {**base_config, **config}


mnist = tf.keras.datasets.mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10)
])
predictions = model(x_train[:1]).numpy()
# tf.nn.softmax(predictions).numpy()
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer=GradientAccumulator(tf.keras.optimizers.Adam()),
              loss=loss_fn,
              metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)

Here is my implementation

When running this code on TF2.5 keras (run_eagerly=False) with MultiWorkerMirroredStrategy on 8 GPUs the time to train is ~2 times slower than running w/o gradient accumulation ( using the Class when accum_step=1). Do you know what is the reason for this x2 slowdown , the time to train should have stayed at the same.

ajakoby avatar Jul 15 '21 06:07 ajakoby

Could you provide tensorflow profiles?

fsx950223 avatar Jul 15 '21 06:07 fsx950223

I found the default setting is faster than accum_steps=1 on a single device.

fsx950223 avatar Jul 15 '21 06:07 fsx950223

You could test the code on my PR which I have fixed several bugs.

fsx950223 avatar Jul 15 '21 09:07 fsx950223

# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import tensorflow as tf
from tensorflow_addons.utils import types
from typeguard import typechecked


class GradientAccumulator(tf.keras.optimizers.Optimizer):
    """Optimizer wrapper for gradient accumulation."""
    @typechecked
    def __init__(
        self,
        optimizer: types.Optimizer,
        accum_steps: types.TensorLike = 4,
        name: str = "GradientAccumulator",
        **kwargs,
    ):
        r"""Construct a new GradientAccumulator optimizer.

        Args:
            optimizer: str or `tf.keras.optimizers.Optimizer` that will be
                used to compute and apply gradients.
            accum_steps: int > 0. Update gradient in every accumulation steps.
            name: Optional name for the operations created when applying
                gradients. Defaults to "GradientAccumulator".
            **kwargs: keyword arguments. Allowed to be {`clipnorm`,
                `clipvalue`, `lr`, `decay`}. `clipnorm` is clip gradients by
                norm; `clipvalue` is clip gradients by value, `decay` is
                included for backward compatibility to allow time inverse
                decay of learning rate. `lr` is included for backward
                compatibility, recommended to use `learning_rate` instead.
        """
        super().__init__(name, **kwargs)
        self._optimizer = tf.keras.optimizers.get(optimizer)
        self._gradients = []
        self._accum_steps = accum_steps

    def _create_slots(self, var_list):
        self._optimizer._create_slots(var_list=var_list)
        for var in var_list:
            self.add_slot(var, "ga")

        self._gradients = [self.get_slot(var, "ga") for var in var_list]

    @property
    def gradients(self):
        """The accumulated gradients on the current replica."""
        if not self._gradients:
            raise ValueError(
                "The accumulator should be called first to initialize the gradients"
            )
        return list(
            gradient.read_value() if gradient is not None else gradient
            for gradient in self._gradients
        )

    def apply_gradients(self, grads_and_vars, name=None, **kwargs):
        self._optimizer._iterations = self.iterations
        return super().apply_gradients(grads_and_vars, name, **kwargs)

    def _resource_apply_dense(self, grad, var, apply_state=None):
        accum_gradient = self.get_slot(var, "ga")
        if accum_gradient is not None and grad is not None:
            accum_gradient.assign_add(
                grad, use_locking=self._use_locking, read_value=False
            )

        def _apply():
            if "apply_state" in self._optimizer._dense_apply_args:
                train_op = self._optimizer._resource_apply_dense(
                    accum_gradient.read_value(), var, apply_state=apply_state
                )
            else:
                train_op = self._optimizer._resource_apply_dense(
                    accum_gradient.read_value(), var
                )
            reset_op = accum_gradient.assign(
                        tf.zeros_like(accum_gradient),
                        use_locking=self._use_locking,
                        read_value=False,
                    )
            return tf.group(train_op, reset_op)

        apply_op = tf.cond(
            (self.iterations+1) % self._accum_steps == 0, _apply, lambda: tf.no_op()
        )
        return apply_op

    def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state):
        accum_gradient = self.get_slot(var, "ga")
        if accum_gradient is not None and grad is not None:
            self._resource_scatter_add(accum_gradient, indices, grad)

        def _apply():
            if "apply_state" in self._optimizer._sparse_apply_args:
                train_op = self._optimizer._resource_apply_sparse(
                    accum_gradient.sparse_read(indices),
                    var,
                    indices,
                    apply_state=apply_state,
                )
            else:
                train_op = self._optimizer._resource_apply_sparse(
                    accum_gradient.sparse_read(indices), var, indices
                )
            reset_op = accum_gradient.assign(
                        tf.zeros_like(accum_gradient),
                        use_locking=self._use_locking,
                        read_value=False,
                    )
            return tf.group(train_op, reset_op)

        apply_op = tf.cond(
            (self.iterations+1) % self._accum_steps == 0, _apply, lambda: tf.no_op()
        )
        return apply_op

    def reset(self):
        """Resets the accumulated gradients on the current replica."""
        assign_ops = []
        if not self._gradients:
            return assign_ops

        for gradient in self._gradients:
            if gradient is not None:
                assign_ops.append(
                    gradient.assign(
                        tf.zeros_like(gradient),
                        use_locking=self._use_locking,
                        read_value=False,
                    )
                )

        return tf.group(assign_ops)

    @property
    def lr(self):
        return self._optimizer._get_hyper("learning_rate")

    @lr.setter
    def lr(self, lr):
        self._optimizer._set_hyper("learning_rate", lr)  #

    @property
    def learning_rate(self):
        return self._optimizer._get_hyper("learning_rate")

    @learning_rate.setter
    def learning_rate(self, learning_rate):
        self._optimizer._set_hyper("learning_rate", learning_rate)

    def get_config(self):
        config = {"accum_steps": self._accum_steps}
        base_config = super().get_config()
        return {**base_config, **config}



mnist = tf.keras.datasets.mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10)
])

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer=GradientAccumulator(tf.keras.optimizers.Adam(), accum_steps=4),
              loss=loss_fn,
              metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)

Here is my implementation

This seems to work just fine for training, but I have some strange behaviour when loading the trained model using tf.keras.models.load_model with compile=True.

After training, I just load the model like so:

model = tf.keras.models.load_model(
        model_path,
        custom_objects={
            "GradientAccumulator": GradientAccumulator(optimizer=Adam(ret.learning_rate), accum_steps=ret.accumsteps),
        },
        compile=True,
    )

This results in:

TypeError: missing a required argument: 'optimizer'

Probably something silly I am missing here? Any ideas, @fsx950223? I just trained a simple CNN classifier, nothing fancy. Works fine if regular Adam is used. Tested using TF 2.8 and Python 3.8.10.

andreped avatar May 25 '22 08:05 andreped

I managed to get it working by doing the following modifications.

Seems like the optimizer argument is missing in the get_config. I rewrote the config variable to:

config = {"optimizer": self.optimizer, "accum_steps": self._accum_steps}

and added self.optimizer = optimizer to __init__().

Then when feeding the Adam optimizer, I got that:

TypeError: type of argument "optimizer" must be one of (keras.optimizer_v2.optimizer_v2.OptimizerV2, str); got dict instead

I believe that is because of the type hint optimizer: types.Optimizer in __init__(). If you wish to send the Adam optimizer (for instance) as input, it will fail as it will be stored as a Dict. However, that is the expected format, as you do self._optimizer=tf.keras.optimizers.get(optimizer) in the __init__(), where the get method expects either a string or Dict. Hence, I removed the type hint for the optimizer to just get optimizer. Might be a more preferable type hint, but I am not aware of any.

Lastly, in my example, the custom object should be custom_objects={"GradientAccumulator": GradientAccumulator}.

I could post the final, clean version, if of interest. If you are planning on merging this into TF-addons, I could contribute to the PR.

andreped avatar May 25 '22 08:05 andreped

Hi @andreped , I just saw your implementation here (https://github.com/andreped/GradientAccumulator). Great work! and I hope it will work fine with multi-gpu strategy :D .

dathudeptrai avatar May 31 '22 15:05 dathudeptrai