horovod icon indicating copy to clipboard operation
horovod copied to clipboard

[RFE] Removing HVD nodes from SavedModels

Open DEKHTIARJonathan opened this issue 3 years ago • 6 comments

CC: @nluehr @bixia1 @tgaddair

  • Horovod Version : 0.20.2
  • Tensorflow: 1.15.4

Problem description

When users train their model with Horovod, in certain scenarios, leftovers HVD nodes are left in the SavedModels.

Which is a problem from multiple reasons:

  • Inference workers must have the following installed:
    • Horovod
    • MPI if HVD is not build with GLOO support

And this is not very smart because Horovod is not to be used in these scenarios.

How to reproduce:

Let's create a file train.py:

import os
import errno
import tensorflow as tf
import horovod.tensorflow as hvd
import numpy as np
import argparse

from tensorflow import keras

layers = tf.layers

tf.logging.set_verbosity(tf.logging.INFO)

# Training settings
parser = argparse.ArgumentParser(description='Tensorflow MNIST Example')
parser.add_argument('--use-adasum', action='store_true', default=False,
                    help='use adasum algorithm to do reduction')
parser.add_argument('--gradient-predivide-factor', type=float, default=1.0,
                    help='apply gradient predivide factor in optimizer (default: 1.0)')
args = parser.parse_args()

def conv_model(feature, target, mode):
    """2-layer convolution model."""
    # Convert the target to a one-hot tensor of shape (batch_size, 10) and
    # with a on-value of 1 for each one-hot vector of length 10.
    target = tf.one_hot(tf.cast(target, tf.int32), 10, 1, 0)

    # Reshape feature to 4d tensor with 2nd and 3rd dimensions being
    # image width and height final dimension being the number of color channels.
    feature = tf.reshape(feature, [-1, 28, 28, 1])

    # First conv layer will compute 32 features for each 5x5 patch
    with tf.variable_scope('conv_layer1'):
        h_conv1 = layers.conv2d(feature, 32, kernel_size=[5, 5],
                                activation=tf.nn.relu, padding="SAME")
        h_pool1 = tf.nn.max_pool(
            h_conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

    # Second conv layer will compute 64 features for each 5x5 patch.
    with tf.variable_scope('conv_layer2'):
        h_conv2 = layers.conv2d(h_pool1, 64, kernel_size=[5, 5],
                                activation=tf.nn.relu, padding="SAME")
        h_pool2 = tf.nn.max_pool(
            h_conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
        # reshape tensor into a batch of vectors
        h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64])

    # Densely connected layer with 1024 neurons.
    h_fc1 = layers.dropout(
        layers.dense(h_pool2_flat, 1024, activation=tf.nn.relu),
        rate=0.5, training=mode == tf.estimator.ModeKeys.TRAIN)

    # Compute logits (1 per class) and compute loss.
    logits = layers.dense(h_fc1, 10, activation=None)
    loss = tf.losses.softmax_cross_entropy(target, logits)

    return tf.argmax(logits, 1), loss


def train_input_generator(x_train, y_train, batch_size=64):
    assert len(x_train) == len(y_train)
    while True:
        p = np.random.permutation(len(x_train))
        x_train, y_train = x_train[p], y_train[p]
        index = 0
        while index <= len(x_train) - batch_size:
            yield x_train[index:index + batch_size], \
                  y_train[index:index + batch_size],
            index += batch_size


def main(_):
    # Horovod: initialize Horovod.
    hvd.init()

    # Keras automatically creates a cache directory in ~/.keras/datasets for
    # storing the downloaded MNIST data. This creates a race
    # condition among the workers that share the same filesystem. If the
    # directory already exists by the time this worker gets around to creating
    # it, ignore the resulting exception and continue.
    cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets')
    if not os.path.exists(cache_dir):
        try:
            os.mkdir(cache_dir)
        except OSError as e:
            if e.errno == errno.EEXIST and os.path.isdir(cache_dir):
                pass
            else:
                raise

    # Download and load MNIST dataset.
    (x_train, y_train), (x_test, y_test) = \
        keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank())

    # The shape of downloaded data is (-1, 28, 28), hence we need to reshape it
    # into (-1, 784) to feed into our network. Also, need to normalize the
    # features between 0 and 1.
    x_train = np.reshape(x_train, (-1, 784)) / 255.0
    x_test = np.reshape(x_test, (-1, 784)) / 255.0

    # Build model...
    with tf.name_scope('input'):
        image = tf.placeholder(tf.float32, [None, 784], name='image')
        label = tf.placeholder(tf.float32, [None], name='label')
    predict, loss = conv_model(image, label, tf.estimator.ModeKeys.TRAIN)

    lr_scaler = hvd.size()
    # By default, Adasum doesn't need scaling when increasing batch size. If used with NCCL,
    # scale lr by local_size
    if args.use_adasum:
        lr_scaler = hvd.local_size() if hvd.nccl_built() else 1

    # Horovod: adjust learning rate based on lr_scaler.
    opt = tf.train.AdamOptimizer(0.001 * lr_scaler)

    # Horovod: add Horovod Distributed Optimizer.
    opt = hvd.DistributedOptimizer(opt, op=hvd.Adasum if args.use_adasum else hvd.Average,
                                   gradient_predivide_factor=args.gradient_predivide_factor)

    global_step = tf.train.get_or_create_global_step()
    train_op = opt.minimize(loss, global_step=global_step)

    # Horovod: pin GPU to be used to process local rank (one GPU per process)
    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    config.gpu_options.visible_device_list = str(hvd.local_rank())

    # Horovod: save checkpoints only on worker 0 to prevent other workers from
    # corrupting them.
    checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None
    training_batch_generator = train_input_generator(x_train,
                                                     y_train, batch_size=100)
 
    with tf.Session(config=config) as sess:
        
        sess.run(tf.global_variables_initializer())
        sess.run(hvd.broadcast_global_variables(0))
    
        for _ in range(1, 201):
            image_, label_ = next(training_batch_generator)
            sess.run(train_op, feed_dict={image: image_, label: label_})
            
            if _ % 10 == 0:
                print("Running step {}/200".format(_))
 
        if hvd.rank() == 0:
                #sess.graph._unsafe_unfinalize()
                input_nodes = [("input", image)]
                output_nodes = [("preds", predict)]

                
                tf.saved_model.simple_save(
                    sess,
                    "saved_models",
                    inputs={name: node for name, node in input_nodes},
                    outputs={name: node for name, node in output_nodes}
                )


if __name__ == "__main__":
    tf.app.run()

Let's also create a file inference.py:

import tensorflow as tf
# import horovod.tensorflow as hvd  # Uncommenting this line fixes the issue

with tf.Session(graph=tf.Graph()) as sess:
    tf.saved_model.loader.load(sess, ["serve"], "saved_models")
    graph = tf.get_default_graph()
    print(graph.get_operations())

Now let's execute everything:

horovodrun -np 1 python train .py

[...]   # wait for the process to finish

python inference.py

Traceback (most recent call last):
  File "inference.py", line 4, in <module>
    tf.saved_model.loader.load(sess, ["serve"], "saved_models")
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/util/deprecation.py", line 330, in new_func
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/saved_model/loader_impl.py", line 269, in load
    return loader.load(sess, tags, import_scope, **saver_kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/saved_model/loader_impl.py", line 422, in load
    **saver_kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/saved_model/loader_impl.py", line 352, in load_graph
    meta_graph_def, import_scope=import_scope, **saver_kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/saver.py", line 1477, in _import_meta_graph_with_return_elements
    **kwargs))
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/meta_graph.py", line 809, in import_scoped_meta_graph_with_return_elements
    return_elements=return_elements)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/util/deprecation.py", line 513, in new_func
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/importer.py", line 405, in import_graph_def
    producer_op_list=producer_op_list)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/importer.py", line 501, in _import_graph_def_internal
    graph._c_graph, serialized, options)  # pylint: disable=protected-access
tensorflow.python.framework.errors_impl.NotFoundError: Op type not registered 'HorovodAllreduce' in binary running on Jonathan-NVWS. Make sure the Op and Kernel are registered in the binary running in this process. Note that if you are loading a saved graph which used ops from tf.contrib, accessing (e.g.) `tf.contrib.resampler` should be done before importing the graph, as contrib ops are lazily registered when the module is first accessed.

Possible directions:

  • Monkey patching TF APIs to transparently remove the HVD nodes during saving.
  • Providing an API like hvd.prepare_savedmodel_for_inference()
  • Providing a command line tool that remove HVD nodes from a SavedModels
  • Pushing a PR on TF side that filter these nodes

DEKHTIARJonathan avatar Nov 06 '20 21:11 DEKHTIARJonathan

I also just hit this issue.

I have a performance analysis infrastructure I developed for studying networks in training mode. The first step is to create a saved model file which includes all the training state. Then we take the saved model files and replay them in the TF analysis code. The issue just appeared when I moved to a more recent TF version (Tensorflow: 1.15.2)

My network applies Horovod to the optimizer like below example. In my specific case I'm only measuring performance for single GPU, so the analysis software doesn't have any explicit support for HVD.

optimizer = hvd.DistributedOptimizer(optimizer)

I get the following error when I load the saved model.

tensorflow.python.framework.errors_impl.NotFoundError: Op type not registered 'HorovodAllreduce' in binary running on ef9144e838cd. Make sure the Op and Kernel are registered in the binary running in this process. Note that if you are loading a saved graph which used ops from tf.contrib, accessing (e.g.) tf.contrib.resampler should be done before importing the graph, as contrib ops are lazily registered when the module is first accessed.

I tried the fix suggested by @DEKHTIARJonathan and it worked. I added the import statement below to my analysis code.

import horovod.tensorflow as hvd

alexmsettle avatar Nov 13 '20 22:11 alexmsettle

I am getting the same error while importing saved model for UNet Industrial in TF. @DEKHTIARJonathan's suggestion of adding import horovod.tensorflow as hvd fixed the error.

nvgarvitk avatar Jan 29 '21 04:01 nvgarvitk

@DEKHTIARJonathan I get the following error after doing the change import horovod.tensorflow as hvd

NodeDef missing attrs 'postscale_factor', 'prescale_factor', 'reduce_op' from Op<name=HorovodAllreduce; signature=tensor:T -> sum:T; attr=T:type,allowed=[DT_INT32, DT_INT64, DT_HALF, DT_FLOAT, DT_DOUBLE]; attr=reduce_op:int; attr=prescale_factor:float; attr=postscale_factor:float; attr=ignore_name_scope:bool,default=false>; NodeDef: {{node Loss_Optimization/all_reduce/HorovodAllreduce_Loss_Optimization_mul_1_0}}

Could you please help

sandeep3sai avatar Aug 11 '21 11:08 sandeep3sai

@DEKHTIARJonathan I get the following error after doing the change import horovod.tensorflow as hvd

NodeDef missing attrs 'postscale_factor', 'prescale_factor', 'reduce_op' from Op<name=HorovodAllreduce; signature=tensor:T -> sum:T; attr=T:type,allowed=[DT_INT32, DT_INT64, DT_HALF, DT_FLOAT, DT_DOUBLE]; attr=reduce_op:int; attr=prescale_factor:float; attr=postscale_factor:float; attr=ignore_name_scope:bool,default=false>; NodeDef: {{node Loss_Optimization/all_reduce/HorovodAllreduce_Loss_Optimization_mul_1_0}}

Could you please help

Have you solved the problem? I am getting the same error after doing the change import horovod.tensorflow as hvd

zwlanpishu avatar Jan 04 '22 08:01 zwlanpishu

I am also seeing the same error even after the import.

ramkrishna1121 avatar Jan 18 '22 18:01 ramkrishna1121

@sandeep3sai @zwlanpishu @ramkrishna1121 You are seeing this error because your saved_model file was created before the "scale factor" feature was added to Horovod in v0.20.0, so the Horovod op signature is different. A workaround is to use a prior version of Horovod before running your script:

pip install horovod==0.19.5

nvcastet avatar Jan 18 '22 23:01 nvcastet