NVTabular icon indicating copy to clipboard operation
NVTabular copied to clipboard

[BUG] LambdaOp doesnt work with multi-GPU cluster, multiple output workers

Open bschifferer opened this issue 3 years ago • 3 comments

Describe the bug I run a multi=GPU NVTabular workflow with LambdaOps. It executes the fit functionality, but when the pipeline does tramsform and to parquet, I get following error. I I run on single GPU, it does work.

Failed to transform operator <nvtabular.ops.lambdaop.LambdaOp object at 0x7fbb4a93f2e0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py", line 539, in _transform_partition
    f"Dtype discrepancy detected for column {col_name}: "
  File "/usr/local/lib/python3.8/dist-packages/nvtabular/ops/lambdaop.py", line 103, in label
    source = getsourcelines(self.f)[0][0]
  File "/usr/lib/python3.8/inspect.py", line 979, in getsourcelines
    lines, lnum = findsource(object)
  File "/usr/lib/python3.8/inspect.py", line 798, in findsource
    raise OSError('could not get source code')
OSError: could not get source code
2022-07-20 10:17:49,872 - distributed.worker - WARNING - Compute Failed
Key:       ('write-processed-2c66d3a6a6f1b9c54db536c61f1e311e-partition2c66d3a6a6f1b9c54db536c61f1e311e', "('part_50.parquet', 'part_51.parquet', 'part_52.parquet', 'part_53.parquet', 'part_54.parquet', 'part_55.parquet', 'part_56.parquet', 'part_57.parquet', 'part_58.parquet', 'part_59.parquet')")
Function:  _write_subgraph
args:      (<merlin.io.dask.DaskSubgraph object at 0x7fbb4ab4d550>, ('part_50.parquet', 'part_51.parquet', 'part_52.parquet', 'part_53.parquet', 'part_54.parquet', 'part_55.parquet', 'part_56.parquet', 'part_57.parquet', 'part_58.parquet', 'part_59.parquet'), '/raid/moj_feed_data_v1_sample1_parquet_test/', None, <fsspec.implementations.local.LocalFileSystem object at 0x7fbb4aaee2b0>, [], [], [], 'parquet', 0, False, '')
kwargs:    {}
Exception: "OSError('could not get source code')"

Steps/Code to reproduce bug I use the criteo dask cluster initalization

# Dask dashboard
dashboard_port = "8787"
dask_workdir = '/raid/dask/'

# Deploy a Single-Machine Multi-GPU Cluster
protocol = "tcp"  # "tcp" or "ucx"
if numba.cuda.is_available():
    NUM_GPUS = list(range(len(numba.cuda.gpus)))
else:
    NUM_GPUS = []
visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Delect devices to place workers
device_limit_frac = 0.7  # Spill GPU-Worker memory to host at this limit.
device_pool_frac = 0.8
part_mem_frac = 0.15

# Use total device size to calculate args.device_limit_frac
device_size = device_mem_size(kind="total")
device_limit = int(device_limit_frac * device_size)
device_pool_size = int(device_pool_frac * device_size)
part_size = int(part_mem_frac * device_size)

# Check if any device memory is already occupied
for dev in visible_devices.split(","):
    fmem = pynvml_mem_size(kind="free", index=int(dev))
    used = (device_size - fmem) / 1e9
    if used > 1.0:
        warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")

cluster = None  # (Optional) Specify existing scheduler port
if cluster is None:
    cluster = LocalCUDACluster(
        protocol=protocol,
        n_workers=len(visible_devices.split(",")),
        CUDA_VISIBLE_DEVICES=visible_devices,
        device_memory_limit=device_limit,
        local_directory=dask_workdir,
        dashboard_address=":" + dashboard_port,
        rmm_pool_size=(device_pool_size // 256) * 256
    )

# Create the distributed client
client = Client(cluster)
client

E.g.

col_cat_int8 = col_cat_int8 >> nvt.ops.Categorify() >> nvt.ops.LambdaOp(lambda x: x.astype('int8'))
workflow.transform(dataset).to_parquet(output_path, out_files_per_proc=10)

bschifferer avatar Jul 20 '22 10:07 bschifferer

The only thing I know that serialized Python lambdas is cloudpickle, so I guess we could serialize the Workflow to cloudpickle and then deserialize it on the workers? Not too sure how to do that, but it's the only approach I can think of.

karlhigley avatar Jul 20 '22 21:07 karlhigley

@bschifferer, as a short-term workaround - can you try this instead to explicitly set the dtype in the LambdaOp?

col_cat_int8 = col_cat_int8 >> nvt.ops.Categorify() >> nvt.ops.LambdaOp(lambda x: x.astype('int8'), dtype="int8")
workflow.transform(dataset).to_parquet(output_path, out_files_per_proc=10)

This should at least cause your workflow to execute on multiple workers.

I think the root cause of the issue here is that the 'capture_dtypes' functionality doesn't work on a distributed cluster environment - since the dtypes are only captured here https://github.com/NVIDIA-Merlin/NVTabular/blob/bc71785aab96368fb51f60d876fdfecba709e494/nvtabular/workflow/workflow.py#L534-L535 on the worker process, and the dtypes for each node in the graph aren't communicated back to the actual workflow.

The LambdaOp issue is a red-herring - we're already using cloudpickle under the hood (either to save the workflow itself, or distribute work with dask https://distributed.dask.org/en/stable/serialization.html#defaults . The issue was in raising the exception, called the LambdaOp.label functionality which failed =(. I've fixed the LambdaOp.label call here https://github.com/NVIDIA-Merlin/NVTabular/pull/1634- and added a basic test using LambdaOp's with dask there too

benfred avatar Jul 23 '22 03:07 benfred

Yes that worked around works for me - thanks

bschifferer avatar Jul 27 '22 12:07 bschifferer