xorbits
xorbits copied to clipboard
BUG: DF and Series.nunique() error on GPU
Note that the issue tracker is NOT the place for general support. For discussions about development, questions about usage, or any general questions, contact us on https://discuss.xorbits.io/.
Reproduce:
a = to_gpu(md.Series([1, 2, 3]))
b = a.nunique().execute()
print(b)
Error:
xorbits/_mars/core/entity/tileables.py:405: in execute
result = self.data.execute(session=session, **kw)
xorbits/_mars/core/entity/executable.py:152: in execute
return execute(self, session=session, **kw)
xorbits/_mars/deploy/oscar/session.py:1877: in execute
return session.execute(
xorbits/_mars/deploy/oscar/session.py:1671: in execute
execution_info: ExecutionInfo = fut.result(
../../../miniconda3/lib/python3.9/concurrent/futures/_base.py:446: in result
return self.__get_result()
../../../miniconda3/lib/python3.9/concurrent/futures/_base.py:391: in __get_result
raise self._exception
xorbits/_mars/deploy/oscar/session.py:1857: in _execute
await execution_info
../../../miniconda3/lib/python3.9/asyncio/tasks.py:688: in _wrap_awaitable
return (yield from awaitable.__await__())
xorbits/_mars/deploy/oscar/session.py:104: in wait
return await self._aio_task
xorbits/_mars/deploy/oscar/session.py:952: in _run_in_background
raise task_result.error.with_traceback(task_result.traceback)
xorbits/_mars/services/task/supervisor/processor.py:387: in run
async for stage_args in self._iter_stage_chunk_graph():
xorbits/_mars/services/task/supervisor/processor.py:171: in _iter_stage_chunk_graph
chunk_graph = await self._get_next_chunk_graph(chunk_graph_iter)
xorbits/_mars/services/task/supervisor/processor.py:162: in _get_next_chunk_graph
chunk_graph = await fut
../../../miniconda3/lib/python3.9/asyncio/threads.py:25: in to_thread
return await loop.run_in_executor(None, func_call)
../../../miniconda3/lib/python3.9/concurrent/futures/thread.py:58: in run
result = self.fn(*self.args, **self.kwargs)
xorbits/_mars/services/task/supervisor/processor.py:157: in next_chunk_graph
return next(chunk_graph_iter)
xorbits/_mars/services/task/supervisor/preprocessor.py:201: in tile
for chunk_graph in chunk_graph_builder.build():
xorbits/_mars/core/graph/builder/chunk.py:431: in build
yield from self._build()
xorbits/_mars/core/graph/builder/chunk.py:425: in _build
graph = next(tile_iterator)
xorbits/_mars/services/task/supervisor/preprocessor.py:87: in _iter_with_check
for chunk_graph in self._iter_without_check():
xorbits/_mars/services/task/supervisor/preprocessor.py:75: in _iter_without_check
to_update_tileables = self._iter()
xorbits/_mars/core/graph/builder/chunk.py:308: in _iter
self._tile(
xorbits/_mars/core/graph/builder/chunk.py:201: in _tile
need_process = next(tile_handler)
xorbits/_mars/core/graph/builder/chunk.py:173: in _tile_handler
tiled_tileables = yield from handler.tile(tiled_tileables)
xorbits/_mars/core/entity/tileables.py:80: in tile
tiled_result = yield from tile_handler(op)
xorbits/_mars/dataframe/reduction/core.py:295: in tile
in_df.agg(
xorbits/_mars/dataframe/reduction/aggregation.py:1047: in aggregate
if not is_funcs_aggregate(func, func_kw=kw, ndim=df.ndim):
xorbits/_mars/dataframe/reduction/aggregation.py:969: in is_funcs_aggregate
compiler.add_function(f, 1)
xorbits/_mars/dataframe/reduction/core.py:864: in add_function
compile_result = self._compile_function(func, func_name, ndim=ndim)
xorbits/_mars/core/mode.py:78: in _inner
return func(*args, **kwargs)
xorbits/_mars/dataframe/reduction/core.py:931: in _compile_function
func_ret = self._build_mock_return_object(func, object, ndim=1)
xorbits/_mars/dataframe/reduction/core.py:909: in _build_mock_return_object
return func(mock_obj)
xorbits/_mars/dataframe/reduction/core.py:675: in __call__
return build_custom_reduction_result(value, self)
xorbits/_mars/dataframe/reduction/custom_reduction.py:46: in build_custom_reduction_result
return op(df)
xorbits/_mars/dataframe/reduction/core.py:432: in __call__
return self._call_series(a)
xorbits/_mars/dataframe/reduction/core.py:410: in _call_series
result_scalar = getattr(self, "custom_reduction").__call_agg__(empty_series)
xorbits/_mars/dataframe/reduction/core.py:679: in __call_agg__
r = self.pre(value)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <xorbits._mars.dataframe.reduction.nunique.NuniqueReduction object at 0x7f41cf716b20>
in_data = 1 O1
dtype: object
def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
xdf = cudf if self.is_gpu() else pd
if isinstance(in_data, xdf.Series):
unique_values = in_data.drop_duplicates()
return xdf.Series(unique_values, name=in_data.name)
else:
if self._axis == 0:
data = dict()
for d, v in in_data.iteritems():
if not self._use_arrow_dtype or xdf is cudf:
> data[d] = [v.drop_duplicates().to_list()]
E AttributeError: 'str' object has no attribute 'drop_duplicates'
xorbits/_mars/dataframe/reduction/nunique.py:65: AttributeError
And during debug, there is another issue: NuniqueReduction makes all the data turn to CPU(pandas) to be calculated.