sagemaker-tensorflow-training-toolkit icon indicating copy to clipboard operation
sagemaker-tensorflow-training-toolkit copied to clipboard

Support different tf.distribute.Strategies for distributed training on SageMaker

Open anirudhacharya opened this issue 5 years ago • 14 comments

For strategies like Multi Worker Mirrored-Strategy TF2 requires us to configure each node individually (https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#multi-worker_configuration). Currently SageMaker does not provide us a way of doing this while trying to launch a distributed training job with Multi Worker Mirrored-Strategy using estimator.fit() method

anirudhacharya avatar Jun 23 '20 21:06 anirudhacharya

@laurenyu If I have to set up things myself, then how can I use esitmator.fit() API to launch the training job? Can't the TF_CONFIG be configured with the distributions parameter (like here)

if you don't specify anything for distributions when creating your estimator, then no TF_CONFIG is set. You can then write your own config in your training script. estimatr.fit() shouldn't interfere, since all it does is launch the training job, which then runs your script.

laurenyu avatar Jun 23 '20 21:06 laurenyu

@laurenyu the config is different for different nodes in the cluster. Are you suggesting we do something like this -

# Start of train.py

if getHostname() == "algo-1":
  # write TF_CONFIG for node-1
elif getHostname() == "algo-2":
  # write TF_CONFIG for node-2

import tensorflow
# Start the actual training script

anirudhacharya avatar Jun 23 '20 22:06 anirudhacharya

@anirudhacharya yep, that's exactly what I was thinking. You can also use the environment variable SM_CURRENT_HOST to get the host name (docs).

laurenyu avatar Jun 23 '20 23:06 laurenyu

@laurenyu I can try this, but I am not sure it will work, because TF_CONFIG looks something like this -

os.environ['TF_CONFIG'] = json.dumps({
                            'cluster': {
                                'worker': [<list of addresses & ports of the nodes that make up the cluster>]
                                },
                            'task': {'type': 'worker', 'index': 0}
                        })

while with the conventional cluster setup, I can ssh into each node and get information like ip address and port number; I am not sure how I would be able to do that from within the training script.

anirudhacharya avatar Jun 24 '20 01:06 anirudhacharya

Did you give it a try? Hostname + port should work.

nadiaya avatar Jul 10 '20 19:07 nadiaya

@nadiaya

while using parameter server distribution type in estimator.fit() call, i came across sagemaker's TF_CONFIG in the logs such as following (just an example for Parameter Server distribution strategy)

TF_CONFIG={
        "cluster":
            {"master": ["algo-1:2222"],
             "ps": ["algo-1:2223", "algo-2:2223"],
             "worker": ["algo-2:2222"]},
        "environment": "cloud",
        "task": {"index": 0, "type": "master"}}

Question:

  1. In our training scripts, how do we access port numbers of workers in each algo (host/instance)? I know we can use os.environ['SM_HOSTS'] to find the hosts, but what about port numbers for each algo (host/instance). like the 2222 and 2223 in the example above ?
  2. If we are able to get port numbers, then its just a matter of defining the right TF_CONFIG to implement Multi Worker Mirrored-Strategy, right ?
  3. A working example for Multi Worker Mirrored-Strategy using 2 or more multiple-gpu instances (ml.p3.8xlarge or bigger) would be highly appreciated.

Thank you

maddy392 avatar Jul 08 '21 19:07 maddy392

Sharing an implementation of working TF Config for MultiNodeMirroredStrategy below. This has been tested on SageMaker Deep Learning container with TensorFlow v2.8 (link to dockerfile).

def _build_tf_config():

    hosts = json.loads(os.getenv("SM_HOSTS"))
    current_host = os.getenv("SM_CURRENT_HOST")

    workers = hosts

    def host_addresses(hosts, port=7777):
        return ["{}:{}".format(host, port) for host in hosts]

    tf_config = {"cluster": {}, "task": {}}
    tf_config["cluster"]["worker"] = host_addresses(workers)
    tf_config["task"] = {"index": workers.index(current_host), "type": "worker"}

    os.environ["TF_CONFIG"] = json.dumps(tf_config)

    return

vdabravolski avatar Apr 09 '22 21:04 vdabravolski

Is there any update on this ? Is anybody working on a PR ?

I see a TF_CONFIG setup is already implemented in https://github.com/aws/sagemaker-tensorflow-training-toolkit/blob/master/src/sagemaker_tensorflow_container/training.py#L37. It would only need some minor modifications of MWMS. The only task remaining is to add a new distribution option named 'multi_worker_mirrored' and include it in this condition https://github.com/aws/sagemaker-tensorflow-training-toolkit/blob/master/src/sagemaker_tensorflow_container/training.py#L139

I will be happy to cut a PR for this if required. This has been open for way too long.

Lokiiiiii avatar May 17 '22 06:05 Lokiiiiii

Hey @Lokiiiiii so is the aim of your feature to add a distribution argument for multi worker mirrored strategy in Sagemaker? i.e. estimator = sagemaker.tensorflow.TensorFlow(entry_point='script.py', ..........distribution = {}) What would the distribution argument be? I know the distribution arg is needed for something like sagemaker distributed data parallel. Right now I use the function provided by vdabravolski to setup the cluster config.

TZeng20 avatar Jun 23 '22 11:06 TZeng20

Hey @Lokiiiiii so is the aim of your feature to add a distribution argument for multi worker mirrored strategy in Sagemaker? i.e. estimator = sagemaker.tensorflow.TensorFlow(entry_point='script.py', ..........distribution = {}) What would the distribution argument be? I know the distribution arg is needed for something like sagemaker distributed data parallel. Right now I use the function provided by vdabravolski to setup the cluster config.

Yes, I have suggested distribution = {'multi_worker_mirrored_strategy': {'enabled': True }} in this PR.

Lokiiiiii avatar Jun 23 '22 13:06 Lokiiiiii

Does anything else apart from the TF_CONFIG need to be configured to make multiworkermirrored work? Let's say I use 2 instances of ml.g4dn.8xlarge which has only 1 gpu per machine.
estimator = sagemaker.tensorflow.TensorFlow(entry_point='script.py', num_instances = 2,.........)

If I use the same config from @vdabravolski it identifies 2 workers properly, len(tf_config['cluster']['worker']) = 2 But it seems that the tf.distribute.InputContext is unable to properly identify the number of replicas in sync or number of input pipelines. Shouldn't num_input_pipelines and num_replicas_in_sync be 2?

strategy= tf.distribute.MultiWorkerMirroredStrategy()

def dataset_fn(input_context):
  dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16)
  print(input_context.num_input_pipelines, input_context.input_pipeline_id, input_context.num_replicas_in_sync)
  return dataset

dist_dataset = strategy.distribute_datasets_from_function(dataset_fn)

>>>
1 0 1

TZeng20 avatar Jun 24 '22 06:06 TZeng20

tf.distribute.InputContext is unable to properly identify the number of replicas in sync or number of input pipelines

This seems like a discussions for https://github.com/tensorflow/tensorflow/issues?q=is%3Aissue+multiworkermirroredstrategy+

Lokiiiiii avatar Jul 04 '22 16:07 Lokiiiiii

Sharing an implementation of working TF Config for MultiNodeMirroredStrategy below. This has been tested on SageMaker Deep Learning container with TensorFlow v2.8 (link to dockerfile).

def _build_tf_config():

    hosts = json.loads(os.getenv("SM_HOSTS"))
    current_host = os.getenv("SM_CURRENT_HOST")

    workers = hosts

    def host_addresses(hosts, port=7777):
        return ["{}:{}".format(host, port) for host in hosts]

    tf_config = {"cluster": {}, "task": {}}
    tf_config["cluster"]["worker"] = host_addresses(workers)
    tf_config["task"] = {"index": workers.index(current_host), "type": "worker"}

    os.environ["TF_CONFIG"] = json.dumps(tf_config)

    return

Where would you add this function ? there is also similar function present here https://docs.aws.amazon.com/sagemaker/latest/dg/training-compiler-tensorflow-models.html, but when I use this function in starting of my training script then I see runtime error (RuntimeError: Collective ops must be configured at program startup),If I do it post strategy then it doesn’t work as multi node :(

surajitkundu-dazn avatar Sep 22 '22 06:09 surajitkundu-dazn

The cluster setup required for MultiWorkerMirroredStrategy is now available as part of the SageMaker Training ToolKit from TF >=2.9. Since we are still working on the SageMaker SDK UI for the same, in the meantime you can leverage the cluster setup by setting an environment variable sagemaker_multi_worker_mirrored_strategy_enabled to 'true'.

As indicated by the error message, the cluster setup or in this case the environment variable needs to the invoked before the training script is executed. You can specify the environment keyword argument when creating the estimator if using the SageMaker SDK.

Lokiiiiii avatar Sep 23 '22 05:09 Lokiiiiii