distributed training only on CPUs
Describe the question.
Hello. I need to use DALI for distributed training only on CPUs. The system where I'm running my benchmark does not have any GPUs. I've tried 'EfficientDet' of the DALI repo, but it works with distributed strategies either on GPUs or on a single CPU. Would you guide me?
Check for duplicates
- [x] I have searched the open bugs/issues and have found no duplicates for this bug report
Hi @zmasih,
Can you tell me how do you run the example and what kind of error do you observe?
I cannot rule out that the code itself is not adjusted to run only on the CPU, we added a CPU variant of the pipeline but not the model itself.
I'm running
python train.py --pipeline dali_cpu --epochs 1 --input_type tfrecord --train_file_pattern './tfrecords/train/*.tfrecord' --batch_size 16 --train_steps 1
For
num_devices = 1
works well (no strategy is used), but for more than 1 device,
ValueError: dali_cpu pipeline is not compatible with multi_gpu mode :<
I understand the problem, but don't know how to change multi_gpu mode.
Hi @zmasih,
Indeed, the code is an example that was not prepared for more than one node in mind for the CPU. In your case, if you want to run the training on a single server, I think that one device is more than enough, as DALI and the TF will use multiple cores under the hood. If you want to use multiple nodes, you can check the RN50 example using Horovod, which should scale better for multiple nodes.
@JanuszL Thank you for your answer.
So, you are saying that if more than one device is available, with no explicit request, when running the code, DALI will use multiple CPU cores to load, decode, and preprocess data concurrently?
And as a quick recheck before starting RN50, You confirm that on my system where there is no GPU, I can use distributed training on multiple nodes with this use case.
So, you are saying that if more than one device is available, with no explicit request, when running the code, DALI will use multiple CPU cores to load, decode, and preprocess data concurrently?
Yes, the only thing you can adjust is the number of CPU threads that DALI uses (num_threads argument).
You confirm that on my system where there is no GPU, I can use distributed training on multiple nodes with this use case.
I believe that the Horovod approach should work in general with DALI on CPU however I cannot say if the examples we have will work, especially the EfficientDet which uses the native TF distributed strategy.
I've tried
python train.py --pipeline dali_cpu --epochs 1 --input_type tfrecord --train_file_pattern './tfrecords/train/*.tfrecord' --batch_size 16 --train_steps 1
on a system with no GPU.
I set device_id=None and cpu_only=True in
docs/examples/use_cases/tensorflow/efficientdet/pipeline/dali/efficientdet_pipeline.py
But I still get the following error:
dlopen libcuda.so failed. Please install GPU driver.DALI daliCreatePipeline3( pipeline_handle, pipeline_def_.pipeline.c_str(), pipeline_def_.pipeline.length(), pipeline_def_.batch_size, pipeline_def_.num_threads, pipeline_def_.device_id, flags, pipeline_def_.prefetch_queue_depth, pipeline_def_.cpu_prefetch_queue_depth, pipeline_def_.gpu_prefetch_queue_depth, pipeline_def_.enable_memory_stats) failed: Assert on "device_id == CPU_ONLY_DEVICE_ID || cuInitChecked()" failed: You are trying to create a GPU DALI pipeline, while CUDA is not available. Please install CUDA or set `device_id = None` in Pipeline constructor. If running inside Docker container, you may need to use `--gpus` option.
2025-01-20 16:27:34.398126: F dali_dataset_op.cc:1019] Non-OK-status: InitPipeline(&pipeline_handle)
Status: INTERNAL: DALI daliCreatePipeline3( pipeline_handle, pipeline_def_.pipeline.c_str(), pipeline_def_.pipeline.length(), pipeline_def_.batch_size, pipeline_def_.num_threads, pipeline_def_.device_id, flags, pipeline_def_.prefetch_queue_depth, pipeline_def_.cpu_prefetch_queue_depth, pipeline_def_.gpu_prefetch_queue_depth, pipeline_def_.enable_memory_stats) failed: Assert on "device_id == CPU_ONLY_DEVICE_ID || cuInitChecked()" failed: You are trying to create a GPU DALI pipeline, while CUDA is not available. Please install CUDA or set `device_id = None` in Pipeline constructor. If running inside Docker container, you may need to use `--gpus` option.
Aborted (core dumped)
Can you please guide me? it worked for me on a system with Cuda, even for --pipeline dali_cpu.
Hi @zmasih,
As I mentioned, the example is not prepared to run without the GPU even if the pipeline can run on the CPU. In this case, each DALI pipeline is assigned to a device (GPU) based on the TF distributed strategy. In this case, once the device id is provided, DALI tries to initialize CUDA.
What you can do is to check if providing None for the device id in L74 of docs/examples/use_cases/tensorflow/efficientdet/pipeline/dali/efficientdet_pipeline.py helps.
Thank you @JanuszL That also didn't help. Since DALI is primarily designed for GPUs, I haven’t found much relevant guidance online. Would you have any additional resources or suggestions that might help in implementing distributed training with DALI on CPU systems?
Hi @zmasih,
You can start with this toy example:
import os.path
test_data_root = os.environ["DALI_EXTRA_PATH"]
# MXNet RecordIO
base = os.path.join(test_data_root, "db", "recordio")
idx_files = [base + "/train.idx"]
rec_files = [base + "/train.rec"]
BATCH_SIZE = 32
ITERATIONS = 32
BURNIN_STEPS = 16
from nvidia.dali import pipeline_def, Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import tensorflow as tf
import nvidia.dali.plugin.tf as dali_tf
import time
from tensorflow.compat.v1 import GPUOptions
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import Session
from tensorflow.compat.v1 import placeholder
tf.compat.v1.disable_eager_execution()
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
%matplotlib inline
def show_images(image_batch, nb_images):
columns = 4
rows = (nb_images + 1) // (columns)
fig = plt.figure(figsize=(32, (32 // columns) * rows))
gs = gridspec.GridSpec(rows, columns)
for j in range(nb_images):
plt.subplot(gs[j])
plt.axis("off")
img = image_batch[0][j].transpose((1, 2, 0)) + 128
plt.imshow(img.astype("uint8"))
@pipeline_def(batch_size=BATCH_SIZE, num_threads=4)
def rn50_pipeline(device):
jpegs, labels = fn.readers.mxnet(
path=rec_files, index_path=idx_files, name="Reader"
)
images = fn.decoders.image(
jpegs, device="mixed" if device == "gpu" else "cpu"
)
images = fn.resize(
images,
resize_shorter=fn.random.uniform(range=(256.0, 480.0)),
interp_type=types.INTERP_LINEAR,
)
images = fn.crop_mirror_normalize(
images,
crop_pos_x=fn.random.uniform(range=(0.0, 1.0)),
crop_pos_y=fn.random.uniform(range=(0.0, 1.0)),
dtype=types.FLOAT,
crop=(224, 224),
mean=[128.0, 128.0, 128.0],
std=[1.0, 1.0, 1.0],
)
images = fn.cast(images, dtype=types.INT32)
if device == "gpu":
labels = labels.gpu()
return images, labels
cpu_pipe = rn50_pipeline(device="cpu", device_id=None)
daliop = dali_tf.DALIIterator()
images_cpu = []
labels_cpu = []
with tf.device("/cpu"):
image_cpu, label_cpu = daliop(
pipeline=cpu_pipe,
shapes=[(BATCH_SIZE, 3, 224, 224), ()],
dtypes=[tf.int32, tf.float32],
)
images_cpu.append(image_cpu)
labels_cpu.append(label_cpu)
with Session() as sess:
all_img_per_sec = []
total_batch_size = BATCH_SIZE
for i in range(ITERATIONS):
start_time = time.time()
# The actual run with our dali_tf tensors
res_cpu = sess.run([images_cpu, labels_cpu])
elapsed_time = time.time() - start_time
img_per_sec = total_batch_size / elapsed_time
if i > BURNIN_STEPS:
all_img_per_sec.append(img_per_sec)
print("\t%7.1f img/s" % img_per_sec)
print(
"Total average %7.1f img/s"
% (sum(all_img_per_sec) / len(all_img_per_sec))
)
show_images(res_cpu[0], 8)
to run DALI on the CPU (tested inside the jupyter notebook).
With the distributed training you need to make sure that the DALI reader shard_id is set accordingly for a given node and num_shards is equal to the world size.