NVTabular
NVTabular copied to clipboard
[BUG] LambdaOp doesnt work with multi-GPU cluster, multiple output workers
Describe the bug I run a multi=GPU NVTabular workflow with LambdaOps. It executes the fit functionality, but when the pipeline does tramsform and to parquet, I get following error. I I run on single GPU, it does work.
Failed to transform operator <nvtabular.ops.lambdaop.LambdaOp object at 0x7fbb4a93f2e0>
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py", line 539, in _transform_partition
f"Dtype discrepancy detected for column {col_name}: "
File "/usr/local/lib/python3.8/dist-packages/nvtabular/ops/lambdaop.py", line 103, in label
source = getsourcelines(self.f)[0][0]
File "/usr/lib/python3.8/inspect.py", line 979, in getsourcelines
lines, lnum = findsource(object)
File "/usr/lib/python3.8/inspect.py", line 798, in findsource
raise OSError('could not get source code')
OSError: could not get source code
2022-07-20 10:17:49,872 - distributed.worker - WARNING - Compute Failed
Key: ('write-processed-2c66d3a6a6f1b9c54db536c61f1e311e-partition2c66d3a6a6f1b9c54db536c61f1e311e', "('part_50.parquet', 'part_51.parquet', 'part_52.parquet', 'part_53.parquet', 'part_54.parquet', 'part_55.parquet', 'part_56.parquet', 'part_57.parquet', 'part_58.parquet', 'part_59.parquet')")
Function: _write_subgraph
args: (<merlin.io.dask.DaskSubgraph object at 0x7fbb4ab4d550>, ('part_50.parquet', 'part_51.parquet', 'part_52.parquet', 'part_53.parquet', 'part_54.parquet', 'part_55.parquet', 'part_56.parquet', 'part_57.parquet', 'part_58.parquet', 'part_59.parquet'), '/raid/moj_feed_data_v1_sample1_parquet_test/', None, <fsspec.implementations.local.LocalFileSystem object at 0x7fbb4aaee2b0>, [], [], [], 'parquet', 0, False, '')
kwargs: {}
Exception: "OSError('could not get source code')"
Steps/Code to reproduce bug I use the criteo dask cluster initalization
# Dask dashboard
dashboard_port = "8787"
dask_workdir = '/raid/dask/'
# Deploy a Single-Machine Multi-GPU Cluster
protocol = "tcp" # "tcp" or "ucx"
if numba.cuda.is_available():
NUM_GPUS = list(range(len(numba.cuda.gpus)))
else:
NUM_GPUS = []
visible_devices = ",".join([str(n) for n in NUM_GPUS]) # Delect devices to place workers
device_limit_frac = 0.7 # Spill GPU-Worker memory to host at this limit.
device_pool_frac = 0.8
part_mem_frac = 0.15
# Use total device size to calculate args.device_limit_frac
device_size = device_mem_size(kind="total")
device_limit = int(device_limit_frac * device_size)
device_pool_size = int(device_pool_frac * device_size)
part_size = int(part_mem_frac * device_size)
# Check if any device memory is already occupied
for dev in visible_devices.split(","):
fmem = pynvml_mem_size(kind="free", index=int(dev))
used = (device_size - fmem) / 1e9
if used > 1.0:
warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")
cluster = None # (Optional) Specify existing scheduler port
if cluster is None:
cluster = LocalCUDACluster(
protocol=protocol,
n_workers=len(visible_devices.split(",")),
CUDA_VISIBLE_DEVICES=visible_devices,
device_memory_limit=device_limit,
local_directory=dask_workdir,
dashboard_address=":" + dashboard_port,
rmm_pool_size=(device_pool_size // 256) * 256
)
# Create the distributed client
client = Client(cluster)
client
E.g.
col_cat_int8 = col_cat_int8 >> nvt.ops.Categorify() >> nvt.ops.LambdaOp(lambda x: x.astype('int8'))
workflow.transform(dataset).to_parquet(output_path, out_files_per_proc=10)
The only thing I know that serialized Python lambdas is cloudpickle, so I guess we could serialize the Workflow to cloudpickle and then deserialize it on the workers? Not too sure how to do that, but it's the only approach I can think of.
@bschifferer, as a short-term workaround - can you try this instead to explicitly set the dtype in the LambdaOp?
col_cat_int8 = col_cat_int8 >> nvt.ops.Categorify() >> nvt.ops.LambdaOp(lambda x: x.astype('int8'), dtype="int8")
workflow.transform(dataset).to_parquet(output_path, out_files_per_proc=10)
This should at least cause your workflow to execute on multiple workers.
I think the root cause of the issue here is that the 'capture_dtypes' functionality doesn't work on a distributed cluster environment - since the dtypes are only captured here https://github.com/NVIDIA-Merlin/NVTabular/blob/bc71785aab96368fb51f60d876fdfecba709e494/nvtabular/workflow/workflow.py#L534-L535 on the worker process, and the dtypes for each node in the graph aren't communicated back to the actual workflow.
The LambdaOp issue is a red-herring - we're already using cloudpickle under the hood (either to save the workflow itself, or distribute work with dask https://distributed.dask.org/en/stable/serialization.html#defaults . The issue was in raising the exception, called the LambdaOp.label functionality which failed =(. I've fixed the LambdaOp.label call here https://github.com/NVIDIA-Merlin/NVTabular/pull/1634- and added a basic test using LambdaOp's with dask there too
Yes that worked around works for me - thanks