DALI icon indicating copy to clipboard operation
DALI copied to clipboard

distributed training only on CPUs

Open zmasih opened this issue 11 months ago • 9 comments

Describe the question.

Hello. I need to use DALI for distributed training only on CPUs. The system where I'm running my benchmark does not have any GPUs. I've tried 'EfficientDet' of the DALI repo, but it works with distributed strategies either on GPUs or on a single CPU. Would you guide me?

Check for duplicates

  • [x] I have searched the open bugs/issues and have found no duplicates for this bug report

zmasih avatar Jan 17 '25 08:01 zmasih

Hi @zmasih,

Can you tell me how do you run the example and what kind of error do you observe?

I cannot rule out that the code itself is not adjusted to run only on the CPU, we added a CPU variant of the pipeline but not the model itself.

JanuszL avatar Jan 17 '25 08:01 JanuszL

I'm running python train.py --pipeline dali_cpu --epochs 1 --input_type tfrecord --train_file_pattern './tfrecords/train/*.tfrecord' --batch_size 16 --train_steps 1 For num_devices = 1 works well (no strategy is used), but for more than 1 device, ValueError: dali_cpu pipeline is not compatible with multi_gpu mode :< I understand the problem, but don't know how to change multi_gpu mode.

zmasih avatar Jan 17 '25 08:01 zmasih

Hi @zmasih,

Indeed, the code is an example that was not prepared for more than one node in mind for the CPU. In your case, if you want to run the training on a single server, I think that one device is more than enough, as DALI and the TF will use multiple cores under the hood. If you want to use multiple nodes, you can check the RN50 example using Horovod, which should scale better for multiple nodes.

JanuszL avatar Jan 17 '25 09:01 JanuszL

@JanuszL Thank you for your answer.

So, you are saying that if more than one device is available, with no explicit request, when running the code, DALI will use multiple CPU cores to load, decode, and preprocess data concurrently?

And as a quick recheck before starting RN50, You confirm that on my system where there is no GPU, I can use distributed training on multiple nodes with this use case.

zmasih avatar Jan 17 '25 10:01 zmasih

So, you are saying that if more than one device is available, with no explicit request, when running the code, DALI will use multiple CPU cores to load, decode, and preprocess data concurrently?

Yes, the only thing you can adjust is the number of CPU threads that DALI uses (num_threads argument).

You confirm that on my system where there is no GPU, I can use distributed training on multiple nodes with this use case.

I believe that the Horovod approach should work in general with DALI on CPU however I cannot say if the examples we have will work, especially the EfficientDet which uses the native TF distributed strategy.

JanuszL avatar Jan 17 '25 11:01 JanuszL

I've tried
python train.py --pipeline dali_cpu --epochs 1 --input_type tfrecord --train_file_pattern './tfrecords/train/*.tfrecord' --batch_size 16 --train_steps 1 on a system with no GPU. I set device_id=None and cpu_only=True in docs/examples/use_cases/tensorflow/efficientdet/pipeline/dali/efficientdet_pipeline.py

But I still get the following error:

dlopen libcuda.so failed. Please install GPU driver.DALI daliCreatePipeline3( pipeline_handle, pipeline_def_.pipeline.c_str(), pipeline_def_.pipeline.length(), pipeline_def_.batch_size, pipeline_def_.num_threads, pipeline_def_.device_id, flags, pipeline_def_.prefetch_queue_depth, pipeline_def_.cpu_prefetch_queue_depth, pipeline_def_.gpu_prefetch_queue_depth, pipeline_def_.enable_memory_stats) failed: Assert on "device_id == CPU_ONLY_DEVICE_ID || cuInitChecked()" failed: You are trying to create a GPU DALI pipeline, while CUDA is not available. Please install CUDA or set `device_id = None` in Pipeline constructor. If running inside Docker container, you may need to use  `--gpus` option.
2025-01-20 16:27:34.398126: F dali_dataset_op.cc:1019] Non-OK-status: InitPipeline(&pipeline_handle)
Status: INTERNAL: DALI daliCreatePipeline3( pipeline_handle, pipeline_def_.pipeline.c_str(), pipeline_def_.pipeline.length(), pipeline_def_.batch_size, pipeline_def_.num_threads, pipeline_def_.device_id, flags, pipeline_def_.prefetch_queue_depth, pipeline_def_.cpu_prefetch_queue_depth, pipeline_def_.gpu_prefetch_queue_depth, pipeline_def_.enable_memory_stats) failed: Assert on "device_id == CPU_ONLY_DEVICE_ID || cuInitChecked()" failed: You are trying to create a GPU DALI pipeline, while CUDA is not available. Please install CUDA or set `device_id = None` in Pipeline constructor. If running inside Docker container, you may need to use  `--gpus` option.
Aborted (core dumped)

Can you please guide me? it worked for me on a system with Cuda, even for --pipeline dali_cpu.

zmasih avatar Jan 20 '25 15:01 zmasih

Hi @zmasih,

As I mentioned, the example is not prepared to run without the GPU even if the pipeline can run on the CPU. In this case, each DALI pipeline is assigned to a device (GPU) based on the TF distributed strategy. In this case, once the device id is provided, DALI tries to initialize CUDA.

What you can do is to check if providing None for the device id in L74 of docs/examples/use_cases/tensorflow/efficientdet/pipeline/dali/efficientdet_pipeline.py helps.

JanuszL avatar Jan 20 '25 15:01 JanuszL

Thank you @JanuszL That also didn't help. Since DALI is primarily designed for GPUs, I haven’t found much relevant guidance online. Would you have any additional resources or suggestions that might help in implementing distributed training with DALI on CPU systems?

zmasih avatar Jan 20 '25 17:01 zmasih

Hi @zmasih,

You can start with this toy example:

import os.path

test_data_root = os.environ["DALI_EXTRA_PATH"]

# MXNet RecordIO
base = os.path.join(test_data_root, "db", "recordio")

idx_files = [base + "/train.idx"]
rec_files = [base + "/train.rec"]

BATCH_SIZE = 32
ITERATIONS = 32
BURNIN_STEPS = 16

from nvidia.dali import pipeline_def, Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import tensorflow as tf
import nvidia.dali.plugin.tf as dali_tf
import time

from tensorflow.compat.v1 import GPUOptions
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import Session
from tensorflow.compat.v1 import placeholder

tf.compat.v1.disable_eager_execution()
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt

%matplotlib inline


def show_images(image_batch, nb_images):
    columns = 4
    rows = (nb_images + 1) // (columns)
    fig = plt.figure(figsize=(32, (32 // columns) * rows))
    gs = gridspec.GridSpec(rows, columns)
    for j in range(nb_images):
        plt.subplot(gs[j])
        plt.axis("off")
        img = image_batch[0][j].transpose((1, 2, 0)) + 128
        plt.imshow(img.astype("uint8"))


@pipeline_def(batch_size=BATCH_SIZE, num_threads=4)
def rn50_pipeline(device):
    jpegs, labels = fn.readers.mxnet(
        path=rec_files, index_path=idx_files, name="Reader"
    )
    images = fn.decoders.image(
        jpegs, device="mixed" if device == "gpu" else "cpu"
    )
    images = fn.resize(
        images,
        resize_shorter=fn.random.uniform(range=(256.0, 480.0)),
        interp_type=types.INTERP_LINEAR,
    )
    images = fn.crop_mirror_normalize(
        images,
        crop_pos_x=fn.random.uniform(range=(0.0, 1.0)),
        crop_pos_y=fn.random.uniform(range=(0.0, 1.0)),
        dtype=types.FLOAT,
        crop=(224, 224),
        mean=[128.0, 128.0, 128.0],
        std=[1.0, 1.0, 1.0],
    )
    images = fn.cast(images, dtype=types.INT32)

    if device == "gpu":
        labels = labels.gpu()

    return images, labels


cpu_pipe = rn50_pipeline(device="cpu", device_id=None)

daliop = dali_tf.DALIIterator()

images_cpu = []
labels_cpu = []
with tf.device("/cpu"):
    image_cpu, label_cpu = daliop(
        pipeline=cpu_pipe,
        shapes=[(BATCH_SIZE, 3, 224, 224), ()],
        dtypes=[tf.int32, tf.float32],
    )

    images_cpu.append(image_cpu)
    labels_cpu.append(label_cpu)

with Session() as sess:
    all_img_per_sec = []
    total_batch_size = BATCH_SIZE

    for i in range(ITERATIONS):
        start_time = time.time()

        # The actual run with our dali_tf tensors
        res_cpu = sess.run([images_cpu, labels_cpu])

        elapsed_time = time.time() - start_time
        img_per_sec = total_batch_size / elapsed_time
        if i > BURNIN_STEPS:
            all_img_per_sec.append(img_per_sec)
            print("\t%7.1f img/s" % img_per_sec)

    print(
        "Total average %7.1f img/s"
        % (sum(all_img_per_sec) / len(all_img_per_sec))
    )
show_images(res_cpu[0], 8)

to run DALI on the CPU (tested inside the jupyter notebook). With the distributed training you need to make sure that the DALI reader shard_id is set accordingly for a given node and num_shards is equal to the world size.

JanuszL avatar Jan 20 '25 18:01 JanuszL