Merlin
Merlin copied to clipboard
[QST] out of memory error while trying out examples in jupyter notebook
❓ Questions & Help
Details
Hi, I have experienced CUDA out of memory error while trying out examples in jupyter notebook files. Is there any neat way of solving this error?
os: Ubuntu 20.04 gpu: NVIDIA GeForce RTX 3060 Ti cuda: 11.8 cudnn: 8 docker version: 24.0.1 docker image: nvcr.io/nvidia/merlin/merlin-pytorch-training:22.03 ipynb file: 02-ETL-with-NVTabular.ipynb
The code that throws an error cudaErrorMemoryAllocation out of memory
:
train_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "train.parquet")])
valid_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "valid.parquet")])
%%time
workflow.fit(train_dataset)
@Bartoliinii hello. Can you clarify the followings:
- which
02
notebook is that? the one ingetting-started-movielens
folder? - Are you using your own dataset or the dataset used in the notebook example?
- Are you running the notebook as in the docker image or you are running the notebook from Merlin repo main branch? These two can be different.
Besides, the docker image version you mentioned above is like one year old. Is it possible for you to use our latest stable docker image? Can you please test merlin-pytorch-training:23.04
image?
Hi @rnyak thanks for responding.
which 02 notebook is that? the one in getting-started-movielens folder?
It was this notebook: https://nvidia-merlin.github.io/NVTabular/v0.7.1/examples/getting-started-movielens/02-ETL-with-NVTabular.html
Are you using your own dataset or the dataset used in the notebook example?
I was running cells, with no custom inputs on my part
Are you running the notebook as in the docker image or you are running the notebook from Merlin repo main branch? These two can be different.
I was running it as a docker image
Besides, the docker image version you mentioned above is like one year old. Is it possible for you to use our latest stable docker image? Can you please test merlin-pytorch-training:23.04 image?
I know, but the last time I tried to use a newer docker image with tensorflow I had to update drivers for nvidia graphics card, this update wrecked my cuda installation.
- One additional question, is there a way to load a parquet with nvt without filling GPU? Something that resembles dask_cudf.read_parquete?
One additional question, is there a way to load a parquet with nvt without filling GPU?
you can directly read data from path to NVT.. like that:
dataset= nvt.Dataset(<path to your parquet files>)
workflow.fit_transform(dataset)
...
does that answer your question?
btw, do you have 8 GB machine?
does that answer your question?
Yes, thank you.
btw, do you have 8 GB machine?
Yes, I do.
@Bartoliinii I noticed that you are following examples in the NVT website.. Please use these notebooks instead:
https://github.com/NVIDIA-Merlin/Merlin/tree/main/examples/getting-started-movielens
thanks.
Thank you for the tip. I'm following the instructions in this docker container for TensorFlow:
nvcr.io/nvidia/merlin/merlin-tensorflow:23.02
In the /Merlin/examples/getting-started-movielens/02-ETL-with-NVTabular.ipynb
I get the same error while running 14th cell:
Cell 13:
train_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "train.parquet")])
valid_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "valid.parquet")])
Cell 14:
%%time
workflow.fit(train_dataset)
Output:
MemoryError: Parquet data was larger than the available GPU memory!
Is there a way to fit workflow to a dataset without filling GPU memmory?
Is there a way to fit workflow to a dataset without filling GPU memory?
- Please be sure there is no other kernels (nbs) are open, shut them down.
- check your gpu consumption before you start notebook2 --> you can type this in your terminal
watch -n0.1 nvidia-smi
, and be sure memory consumption is0
or very low at the beginning. - in
02-ETL-with-NVTabular.ipynb
notebook if you notice we are reading the parquet file with cudf upfront, that makes your GPU memory is used a bit (< 1 GB) before you reach out to NVT pipeline.
so you can change all df_lib.read_parquet
to pandas.read_parquet(..)
movies = df_lib.read_parquet(os.path.join(INPUT_DATA_DIR, "movies_converted.parquet"))
movies.head()
Note that joined = userId + movieId >> JoinExternal(movies, on=["movieId"])
consumes some GPU memory but it is also less than 1 GB
.
train_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "train.parquet")])
does not fill up your GPU memory or uses GPU memory, you can observe your watch -n0.1 nvidia-smi
on terminal and see what cell fills up your GPU memory, or uses GPU memory..
the moment you do workflow.fit()
then your GPU is started to be used. But it is less than 8 GB
so you should be good.
- Please try to set a
part_size
arg as following:
train_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "train.parquet")], part_size='128MB')
valid_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "valid.parquet")], part_size='128MB')
Please be sure you are not using anything like cudf
or dask cudf
that is using your GPU memory before NVT. you can check this using the command I shared in the terminal and find out which cell is consuming your memory.
- I made sure no other nds are running:
root@c16d4c741902:/Merlin/examples/getting-started-movielens# jupyter notebook list
Currently running servers:
- Before running the
02-ETL-with-NVTabular.ipynb
GPU memory consumption was at 6.2% - I changed
df_lib.read_parquet
topandas.read_parquet
in 3 cell - Just before running
workflow.fit(train_dataset)
GPU memory consumption was at 13.4% because of thejoined = userId + movieId >> JoinExternal(movies, on=["movieId"])
Unfortunately, the error still remains. Here is the full output:
/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:148: UserWarning: Compound tags like Tags.USER_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [<Tags.USER: 'user'>, <Tags.ID: 'id'>].
warnings.warn(
/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:148: UserWarning: Compound tags like Tags.ITEM_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [<Tags.ITEM: 'item'>, <Tags.ID: 'id'>].
warnings.warn(
Failed to transform operator <nvtabular.ops.categorify.Categorify object at 0x7f0314138190>
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py", line 463, in transform
encoded = _encode(
File "/usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py", line 1427, in _encode
labels = codes.merge(
File "/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py", line 101, in inner
result = func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/cudf/core/dataframe.py", line 3759, in merge
return merge_cls(
File "/usr/local/lib/python3.8/dist-packages/cudf/core/join/join.py", line 200, in perform_merge
self.rhs._gather(gather_map=right_rows, **gather_kwargs)
File "/usr/local/lib/python3.8/dist-packages/cudf/core/indexed_frame.py", line 1506, in _gather
libcudf.copying.gather(
File "copying.pyx", line 185, in cudf._lib.copying.gather
MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /usr/local/include/rmm/mr/device/cuda_memory_resource.hpp:70: cudaErrorMemoryAllocation out of memory
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py", line 183, in _transform_data
output_data = node.op.transform(selection, input_data)
File "/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py", line 101, in inner
result = func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py", line 483, in transform
raise RuntimeError(f"Failed to categorical encode column {name}") from e
RuntimeError: Failed to categorical encode column genres
---------------------------------------------------------------------------
MemoryError Traceback (most recent call last)
File /usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py:463, in Categorify.transform(self, col_selector, df)
461 path = self.categories[storage_name]
--> 463 encoded = _encode(
464 use_name,
465 storage_name,
466 path,
467 df,
468 self.cat_cache,
469 na_sentinel=self.na_sentinel,
470 freq_threshold=self.freq_threshold[name]
471 if isinstance(self.freq_threshold, dict)
472 else self.freq_threshold,
473 search_sorted=self.search_sorted,
474 buckets=self.num_buckets,
475 encode_type=self.encode_type,
476 cat_names=column_names,
477 max_size=self.max_size,
478 dtype=self.output_dtype,
479 start_index=self.start_index,
480 )
481 new_df[name] = encoded
File /usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py:1427, in _encode(name, storage_name, path, df, cat_cache, na_sentinel, freq_threshold, search_sorted, buckets, encode_type, cat_names, max_size, dtype, start_index)
1426 na_sentinel = 0
-> 1427 labels = codes.merge(
1428 value, left_on=selection_l.names, right_on=selection_r.names, how="left"
1429 ).sort_values("order")["labels"]
1430 labels.fillna(na_sentinel, inplace=True)
File /usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
100 libnvtx_push_range(self.attributes, self.domain.handle)
--> 101 result = func(*args, **kwargs)
102 libnvtx_pop_range(self.domain.handle)
File /usr/local/lib/python3.8/dist-packages/cudf/core/dataframe.py:3759, in DataFrame.merge(self, right, on, left_on, right_on, left_index, right_index, how, sort, lsuffix, rsuffix, indicator, suffixes)
3757 merge_cls = MergeSemi
-> 3759 return merge_cls(
3760 lhs,
3761 rhs,
3762 on=on,
3763 left_on=left_on,
3764 right_on=right_on,
3765 left_index=left_index,
3766 right_index=right_index,
3767 how=how,
3768 sort=sort,
3769 indicator=indicator,
3770 suffixes=suffixes,
3771 ).perform_merge()
File /usr/local/lib/python3.8/dist-packages/cudf/core/join/join.py:200, in Merge.perform_merge(self)
194 left_result = (
195 self.lhs._gather(gather_map=left_rows, **gather_kwargs)
196 if left_rows is not None
197 else cudf.DataFrame._from_data({})
198 )
199 right_result = (
--> 200 self.rhs._gather(gather_map=right_rows, **gather_kwargs)
201 if right_rows is not None
202 else cudf.DataFrame._from_data({})
203 )
205 result = cudf.DataFrame._from_data(
206 *self._merge_results(left_result, right_result)
207 )
File /usr/local/lib/python3.8/dist-packages/cudf/core/indexed_frame.py:1506, in IndexedFrame._gather(self, gather_map, keep_index, nullify, check_bounds)
1503 raise IndexError("Gather map index is out of bounds.")
1505 return self._from_columns_like_self(
-> 1506 libcudf.copying.gather(
1507 list(self._index._columns + self._columns)
1508 if keep_index
1509 else list(self._columns),
1510 gather_map,
1511 nullify=nullify,
1512 ),
1513 self._column_names,
1514 self._index.names if keep_index else None,
1515 )
File copying.pyx:185, in cudf._lib.copying.gather()
MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /usr/local/include/rmm/mr/device/cuda_memory_resource.hpp:70: cudaErrorMemoryAllocation out of memory
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
File <timed eval>:1
File /usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py:213, in Workflow.fit(self, dataset)
209 dependencies.difference_update(current_phase)
211 # This captures the output dtypes of operators like LambdaOp where
212 # the dtype can't be determined without running the transform
--> 213 self._transform_impl(dataset, capture_dtypes=True).sample_dtypes()
214 self.graph.construct_schema(dataset.schema, preserve_dtypes=True)
216 return self
File /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:1155, in Dataset.sample_dtypes(self, n, annotate_lists)
1148 """Return the real dtypes of the Dataset
1149
1150 Use cached metadata if this operation was
1151 already performed. Otherwise, call down to the
1152 underlying engine for sampling logic.
1153 """
1154 if self._real_meta.get(n, None) is None:
-> 1155 _real_meta = self.engine.sample_data(n=n)
1156 if self.dtypes:
1157 _real_meta = _set_dtypes(_real_meta, self.dtypes)
File /usr/local/lib/python3.8/dist-packages/merlin/io/dataset_engine.py:71, in DatasetEngine.sample_data(self, n)
69 _ddf = self.to_ddf()
70 for partition_index in range(_ddf.npartitions):
---> 71 _head = _ddf.partitions[partition_index].head(n)
72 if len(_head):
73 return _head
File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1256, in _Frame.head(self, n, npartitions, compute)
1254 # No need to warn if we're already looking at all partitions
1255 safe = npartitions != self.npartitions
-> 1256 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)
File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1290, in _Frame._head(self, n, npartitions, compute, safe)
1285 result = new_dd_object(
1286 graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
1287 )
1289 if compute:
-> 1290 result = result.compute()
1291 return result
File /usr/local/lib/python3.8/dist-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
291 def compute(self, **kwargs):
292 """Compute this dask collection
293
294 This turns a lazy Dask collection into its in-memory equivalent.
(...)
313 dask.base.compute
314 """
--> 315 (result,) = compute(self, traverse=False, **kwargs)
316 return result
File /usr/local/lib/python3.8/dist-packages/dask/base.py:598, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
595 keys.append(x.__dask_keys__())
596 postcomputes.append(x.__dask_postcompute__())
--> 598 results = schedule(dsk, keys, **kwargs)
599 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/local/lib/python3.8/dist-packages/dask/local.py:557, in get_sync(dsk, keys, **kwargs)
552 """A naive synchronous version of get_async
553
554 Can be useful for debugging.
555 """
556 kwargs.pop("num_workers", None) # if num_workers present, remove it
--> 557 return get_async(
558 synchronous_executor.submit,
559 synchronous_executor._max_workers,
560 dsk,
561 keys,
562 **kwargs,
563 )
File /usr/local/lib/python3.8/dist-packages/dask/local.py:500, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
498 while state["waiting"] or state["ready"] or state["running"]:
499 fire_tasks(chunksize)
--> 500 for key, res_info, failed in queue_get(queue).result():
501 if failed:
502 exc, tb = loads(res_info)
File /usr/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
435 raise CancelledError()
436 elif self._state == FINISHED:
--> 437 return self.__get_result()
439 self._condition.wait(timeout)
441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
File /usr/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
387 if self._exception:
388 try:
--> 389 raise self._exception
390 finally:
391 # Break a reference cycle with the exception in self._exception
392 self = None
File /usr/local/lib/python3.8/dist-packages/dask/local.py:542, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
540 fut = Future()
541 try:
--> 542 fut.set_result(fn(*args, **kwargs))
543 except BaseException as e:
544 fut.set_exception(e)
File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in batch_execute_tasks(it)
234 def batch_execute_tasks(it):
235 """
236 Batch computing of multiple tasks with `execute_task`
237 """
--> 238 return [execute_task(*a) for a in it]
File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in <listcomp>(.0)
234 def batch_execute_tasks(it):
235 """
236 Batch computing of multiple tasks with `execute_task`
237 """
--> 238 return [execute_task(*a) for a in it]
File /usr/local/lib/python3.8/dist-packages/dask/local.py:229, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
227 failed = False
228 except BaseException as e:
--> 229 result = pack_exception(e, dumps)
230 failed = True
231 return key, result, failed
File /usr/local/lib/python3.8/dist-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
222 try:
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
226 result = dumps((result, id))
File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/local/lib/python3.8/dist-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File /usr/local/lib/python3.8/dist-packages/dask/core.py:149, in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/local/lib/python3.8/dist-packages/dask/utils.py:41, in apply(func, args, kwargs)
39 def apply(func, args, kwargs=None):
40 if kwargs:
---> 41 return func(*args, **kwargs)
42 else:
43 return func(*args)
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:72, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes, validate_dtypes)
69 output_data = None
71 for node in nodes:
---> 72 input_data = self._build_input_data(
73 node, transformable, capture_dtypes=capture_dtypes, validate_dtypes=validate_dtypes
74 )
76 if node.op:
77 transformed_data = self._transform_data(
78 node, input_data, capture_dtypes=capture_dtypes, validate_dtypes=validate_dtypes
79 )
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:121, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes, validate_dtypes)
119 for parent in node.parents_with_dependencies:
120 parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 121 parent_data = self.transform(
122 transformable,
123 [parent],
124 capture_dtypes=capture_dtypes,
125 validate_dtypes=validate_dtypes,
126 )
127 if input_data is None or not len(input_data):
128 input_data = parent_data[parent_output_cols]
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:77, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes, validate_dtypes)
72 input_data = self._build_input_data(
73 node, transformable, capture_dtypes=capture_dtypes, validate_dtypes=validate_dtypes
74 )
76 if node.op:
---> 77 transformed_data = self._transform_data(
78 node, input_data, capture_dtypes=capture_dtypes, validate_dtypes=validate_dtypes
79 )
80 else:
81 transformed_data = input_data
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:183, in LocalExecutor._transform_data(self, node, input_data, capture_dtypes, validate_dtypes)
180 try:
181 # use input_columns to ensure correct grouping (subgroups)
182 selection = node.input_columns.resolve(node.input_schema)
--> 183 output_data = node.op.transform(selection, input_data)
185 # Update or validate output_data dtypes
186 if capture_dtypes or validate_dtypes:
File /usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
98 @wraps(func)
99 def inner(*args, **kwargs):
100 libnvtx_push_range(self.attributes, self.domain.handle)
--> 101 result = func(*args, **kwargs)
102 libnvtx_pop_range(self.domain.handle)
103 return result
File /usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py:483, in Categorify.transform(self, col_selector, df)
481 new_df[name] = encoded
482 except Exception as e:
--> 483 raise RuntimeError(f"Failed to categorical encode column {name}") from e
485 return new_df
RuntimeError: Failed to categorical encode column genres
I ran the notebooks in jupyterlab and got the same error. I saw that my GPU memory was near full because the previous kernel was still running, I just went to that notebook and killed the kernel that freed up the memory and I also limited the memory to 128 MB.
train_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "train.parquet")], part_size='128MB')
valid_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "valid.parquet")], part_size='128MB')