NVTabular
NVTabular copied to clipboard
[QST]How Do I Solve the Problem that Missing Values Cannot Be Converted to Int Values?
When I run the case in , an error is reported.
2023-02-22 16:07:04,128 - distributed.worker - WARNING - Compute Failed
Key: ('write-processed-db337936ac67baee573cd5fd6543337d-partitiondb337936ac67baee573cd5fd6543337d', "('part_4.parquet',)")
Function: _write_subgraph
args: (<merlin.io.dask.DaskSubgraph object at 0x7f1772bc9100>, ('part_4.parquet',), '/raid/data/criteo/test_dask/output/train/', <Shuffle.PER_PARTITION: 0>, <fsspec.implementations.local.LocalFileSystem object at 0x7f175d0740a0>, ['C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'C22', 'C23', 'C24', 'C25', 'C26'], ['I1', 'I2', 'I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13'], ['label'], 'parquet', 0, True, '')
kwargs: {}
Exception: "ValueError('cannot convert NA to integer')"
Traceback (most recent call last):
File "/Merlin/examples/scaling-criteo/02_etl_with_nvtabular.py", line 190, in <module>
main()
File "/Merlin/examples/scaling-criteo/02_etl_with_nvtabular.py", line 167, in main
workflow.transform(train_dataset).to_parquet(
File "/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py", line 910, in to_parquet
_ddf_to_dataset(
File "/usr/local/lib/python3.8/dist-packages/merlin/io/dask.py", line 367, in _ddf_to_dataset
out = client.compute(out).result()
File "/usr/local/lib/python3.8/dist-packages/distributed/client.py", line 280, in result
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py", line 101, in inner
result = func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/merlin/io/dask.py", line 202, in _write_subgraph
table = subgraph[part]
File "/usr/local/lib/python3.8/dist-packages/merlin/io/dask.py", line 53, in __getitem__
return dask.get(dsk, key)
File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 557, in get_sync
return get_async(
File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 500, in get_async
for key, res_info, failed in queue_get(queue).result():
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 542, in submit
fut.set_result(fn(*args, **kwargs))
File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 238, in batch_execute_tasks
return [execute_task(*a) for a in it]
File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 238, in <listcomp>
return [execute_task(*a) for a in it]
File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 229, in execute_task
result = pack_exception(e, dumps)
File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 224, in execute_task
result = _execute_task(task, data)
File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/usr/local/lib/python3.8/dist-packages/dask/optimization.py", line 990, in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 149, in get
result = _execute_task(task, cache)
File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 113, in _execute_task
return [_execute_task(a, cache) for a in arg]
File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 113, in <listcomp>
return [_execute_task(a, cache) for a in arg]
File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py", line 89, in __call__
return read_parquet_part(
File "/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py", line 587, in read_parquet_part
dfs = [
File "/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py", line 588, in <listcomp>
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
File "/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py", line 88, in read_partition
part[k] = part[k].astype(type_name.replace("Int", "int"))
File "/usr/local/lib/python3.8/dist-packages/pandas/core/generic.py", line 6240, in astype
new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
File "/usr/local/lib/python3.8/dist-packages/pandas/core/internals/managers.py", line 448, in astype
return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
File "/usr/local/lib/python3.8/dist-packages/pandas/core/internals/managers.py", line 352, in apply
applied = getattr(b, f)(**kwargs)
File "/usr/local/lib/python3.8/dist-packages/pandas/core/internals/blocks.py", line 526, in astype
new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
File "/usr/local/lib/python3.8/dist-packages/pandas/core/dtypes/astype.py", line 299, in astype_array_safe
new_values = astype_array(values, dtype, copy=copy)
File "/usr/local/lib/python3.8/dist-packages/pandas/core/dtypes/astype.py", line 227, in astype_array
values = values.astype(dtype, copy=copy)
File "/usr/local/lib/python3.8/dist-packages/pandas/core/arrays/masked.py", line 471, in astype
raise ValueError("cannot convert NA to integer")
ValueError: cannot convert NA to integer
Why is the error still reported that nan cannot be converted to an int value? The official website handles the problem of missing values. How to solve this problem?
https://nvidia-merlin.github.io/Merlin/main/examples/scaling-criteo/02-ETL-with-NVTabular.html# When run this code
@gukejun1 can you provide more info about your env? how and where did you install merlin libraries? Are you using a docker image? if yes, which docker image? thanks.
the code is same from this, I use docker images(nvcr.io/nvidia/merlin/merlin-tensorflow 22.12)
@rnyak this is my full code
BASE_DIR = os.environ.get("BASE_DIR", "/raid/data/criteo")
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", BASE_DIR + "/converted/criteo")
OUTPUT_DATA_DIR = os.environ.get("OUTPUT_DATA_DIR", BASE_DIR + "/test_dask/output")
USE_HUGECTR = bool(os.environ.get("USE_HUGECTR", ""))
print(USE_HUGECTR)
stats_path = os.path.join(OUTPUT_DATA_DIR, "test_dask/stats")
dask_workdir = os.path.join(OUTPUT_DATA_DIR, "test_dask/workdir")
# Make sure we have a clean worker space for Dask
if os.path.isdir(dask_workdir):
shutil.rmtree(dask_workdir)
os.makedirs(dask_workdir)
# Make sure we have a clean stats space for Dask
if os.path.isdir(stats_path):
shutil.rmtree(stats_path)
os.mkdir(stats_path)
# Make sure we have a clean output path
if os.path.isdir(OUTPUT_DATA_DIR):
shutil.rmtree(OUTPUT_DATA_DIR)
os.mkdir(OUTPUT_DATA_DIR)
fname = "day_{}.parquet"
num_days = len(
[i for i in os.listdir(INPUT_DATA_DIR) if re.match(fname.format("[0-9]{1,2}"), i) is not None]
)
train_paths = [os.path.join(INPUT_DATA_DIR, fname.format(day)) for day in range(num_days - 1)]
valid_paths = [
os.path.join(INPUT_DATA_DIR, fname.format(day)) for day in range(num_days - 1, num_days)
]
train_paths="/raid/data/criteo/converted/criteo/day_0_40000000.parquet"
valid_paths="/raid/data/criteo/converted/criteo/day_1_4000000.parquet"
print(train_paths)
print(valid_paths)
# Dask dashboard
dashboard_port = "8787"
protocol = "tcp" # "tcp" or "ucx"
if numba.cuda.is_available():
NUM_GPUS = list(range(len(numba.cuda.gpus)))
else:
NUM_GPUS = []
visible_devices = ",".join([str(n) for n in NUM_GPUS]) # Select devices to place workers
device_limit_frac = 0.7 # Spill GPU-Worker memory to host at this limit.
device_pool_frac = 0.8
part_mem_frac = 0.15
# Use total device size to calculate args.device_limit_frac
device_size = device_mem_size(kind="total")
device_limit = int(device_limit_frac * device_size)
device_pool_size = int(device_pool_frac * device_size)
part_size = int(part_mem_frac * device_size)
# Check if any device memory is already occupied
for dev in visible_devices.split(","):
fmem = pynvml_mem_size(kind="free", index=int(dev))
used = (device_size - fmem) / 1e9
if used > 1.0:
warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")
cluster = None # (Optional) Specify existing scheduler port
if cluster is None:
cluster = LocalCUDACluster(
protocol=protocol,
n_workers=len(visible_devices.split(",")),
CUDA_VISIBLE_DEVICES=visible_devices,
device_memory_limit=device_limit,
local_directory=dask_workdir,
dashboard_address=":" + dashboard_port,
rmm_pool_size=(device_pool_size // 256) * 256
)
# Create the distributed client
client = Client(cluster)
print(client)
# define our dataset schema
CONTINUOUS_COLUMNS = ["I" + str(x) for x in range(1, 14)]
CATEGORICAL_COLUMNS = ["C" + str(x) for x in range(1, 27)]
LABEL_COLUMNS = ["label"]
COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS + LABEL_COLUMNS
num_buckets = 10000000
categorify_op = Categorify(out_path=stats_path, max_size=num_buckets, dtype='int32')
# categorify_op = Categorify(out_path=stats_path, max_size=num_buckets, dtype=np.zeros(0))
cat_features = CATEGORICAL_COLUMNS >> categorify_op
cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype='float32')
# cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype=np.zeros(0))
label_features = LABEL_COLUMNS >> AddMetadata(
tags=[str(Tags.BINARY_CLASSIFICATION), "target"]
)
features = cat_features + cont_features + label_features
workflow = nvt.Workflow(features)
dict_dtypes = {}
# The environment variable USE_HUGECTR defines, if we want to use the output for HugeCTR or another framework
for col in CATEGORICAL_COLUMNS:
dict_dtypes[col] = np.int64 if USE_HUGECTR else np.int32
for col in CONTINUOUS_COLUMNS:
dict_dtypes[col] = np.float32
for col in LABEL_COLUMNS:
dict_dtypes[col] = np.int32
print(dict_dtypes)
train_dataset = nvt.Dataset(train_paths, engine="parquet", part_size=part_size,
)
valid_dataset = nvt.Dataset(valid_paths, engine="parquet", part_size=part_size,
)
output_train_dir = os.path.join(OUTPUT_DATA_DIR, "train/")
output_valid_dir = os.path.join(OUTPUT_DATA_DIR, "valid/")
# ! mkdir -p $output_train_dir
# ! mkdir -p $output_valid_dir
print(workflow)
workflow.fit(train_dataset)
# train_dataset.fillna(0, inplace=True)
workflow.transform(train_dataset).to_parquet(
output_files=len(NUM_GPUS),
output_path=output_train_dir,
shuffle=nvt.io.Shuffle.PER_PARTITION,
dtypes=dict_dtypes,
cats=CATEGORICAL_COLUMNS,
conts=CONTINUOUS_COLUMNS,
labels=LABEL_COLUMNS,
)
workflow.transform(valid_dataset).to_parquet(
output_path=output_valid_dir,
dtypes=dict_dtypes,
cats=CATEGORICAL_COLUMNS,
conts=CONTINUOUS_COLUMNS,
labels=LABEL_COLUMNS,
)
workflow.save(os.path.join(OUTPUT_DATA_DIR, "workflow"))
I install merlin libraries from the web of https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow
@rnyak The training data is from the first 40 million rows of day_0 in the criteo data set, and the verification data is from the first 4 million rows of day_1.The following figure shows some parquet data visualization.
@gukejun1 if you have null values, normally, when you apply the following lines in the NVT workflow the missing/null values should be filled..
cat_features = CATEGORICAL_COLUMNS >> categorify_op
cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype='float32')
can you share a subset of your parquet file like only couple hundreds rows, so that we can reproduce the issue? thanks.
@rnyak day_1_100.parquet.txt The data comes from the first 100 lines of data in criteo day_1 and is converted to the parquet file using the official method \Merlin\examples\scaling-criteo\01_download_convert.ipynb.
@gukejun1 I used your small dataset with this notebook and all worked fine for me. I cannot reproduce your error.. are you able to reproduce your error only with this small parquet file?
@rnyak Very strange.
@gukejun1 please note that your screenshot shows that you are trying to read in a .parquet.txt
file, not a .parquet
file, that means your files extension type is not correct. it should be .parquet
.
@rnyak It's the same. The only difference is that the file name extension is in the parquet format. Because GitHub cannot upload files with the parquet file name extension, the file name extension is changed to txt.
@rnyak So, this code didn't work.
@rnyak Because my graphics card supports up to cuda 11.3, so I reinstalled cupy-cuda to 113. Is it related to this? Does cupy-cuda 113 support populating missing values?
@gukejun1 what's your graphic card?
your sample set does not have any nulls in the label
column. so I am skeptical that this line gives you error. you can remove this line and test it. you can do like below.
CONTINUOUS_COLUMNS = ["I" + str(x) for x in range(1, 14)]
CATEGORICAL_COLUMNS = ["C" + str(x) for x in range(1, 27)]
LABEL_COLUMNS = ["label"]
COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS + LABEL_COLUMNS
num_buckets = 10000000
categorify_op = Categorify(out_path=stats_path, max_size=num_buckets, dtype='int32')
cat_features = CATEGORICAL_COLUMNS >> categorify_op
cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype='float32')
features = cat_features + cont_features
workflow = nvt.Workflow(features)
...
...
@rnyak The error is still reported.
my graphic card is NVIDIA Tesla P4
@gukejun1 cudf supports Pascal architecture or better (Compute Capability >=6.0)
. see this doc.
can you test if you are able to run the notebooks 01
and 02
in this folder?
- can you share your
pip list
output in a .txt file please? - can you share the result of
nvidia-smi
? - can you tell us your docker image? did you pull
merlin-tensorflow:22.12
?
thanks.
@gukejun1 the error looks like because of pandas, and looks like you are running on CPU not on GPU... Please confirm that the visible devices
from the following code below does not return empty. if it is empty that means you dont use GPU..
protocol = "tcp" # "tcp" or "ucx"
if numba.cuda.is_available():
NUM_GPUS = list(range(len(numba.cuda.gpus)))
else:
NUM_GPUS = []
visible_devices = ",".join([str(n) for n in NUM_GPUS]) # Select devices to place workers
@rnyak For the movie_lens case, 01 / 02 is successful.
1、
requirements.txt
2、
3、i use docker pull nvcr.io/nvidia/merlin/merlin-tensorflow:22.12 to get the docker images.
@rnyak it used GPU