xorbits icon indicating copy to clipboard operation
xorbits copied to clipboard

BUG: DF and Series.nunique() error on GPU

Open ChengjieLi28 opened this issue 1 year ago • 0 comments

Note that the issue tracker is NOT the place for general support. For discussions about development, questions about usage, or any general questions, contact us on https://discuss.xorbits.io/.

Reproduce:

a = to_gpu(md.Series([1, 2, 3]))
b = a.nunique().execute()
print(b)

Error:

xorbits/_mars/core/entity/tileables.py:405: in execute
    result = self.data.execute(session=session, **kw)
xorbits/_mars/core/entity/executable.py:152: in execute
    return execute(self, session=session, **kw)
xorbits/_mars/deploy/oscar/session.py:1877: in execute
    return session.execute(
xorbits/_mars/deploy/oscar/session.py:1671: in execute
    execution_info: ExecutionInfo = fut.result(
../../../miniconda3/lib/python3.9/concurrent/futures/_base.py:446: in result
    return self.__get_result()
../../../miniconda3/lib/python3.9/concurrent/futures/_base.py:391: in __get_result
    raise self._exception
xorbits/_mars/deploy/oscar/session.py:1857: in _execute
    await execution_info
../../../miniconda3/lib/python3.9/asyncio/tasks.py:688: in _wrap_awaitable
    return (yield from awaitable.__await__())
xorbits/_mars/deploy/oscar/session.py:104: in wait
    return await self._aio_task
xorbits/_mars/deploy/oscar/session.py:952: in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
xorbits/_mars/services/task/supervisor/processor.py:387: in run
    async for stage_args in self._iter_stage_chunk_graph():
xorbits/_mars/services/task/supervisor/processor.py:171: in _iter_stage_chunk_graph
    chunk_graph = await self._get_next_chunk_graph(chunk_graph_iter)
xorbits/_mars/services/task/supervisor/processor.py:162: in _get_next_chunk_graph
    chunk_graph = await fut
../../../miniconda3/lib/python3.9/asyncio/threads.py:25: in to_thread
    return await loop.run_in_executor(None, func_call)
../../../miniconda3/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
xorbits/_mars/services/task/supervisor/processor.py:157: in next_chunk_graph
    return next(chunk_graph_iter)
xorbits/_mars/services/task/supervisor/preprocessor.py:201: in tile
    for chunk_graph in chunk_graph_builder.build():
xorbits/_mars/core/graph/builder/chunk.py:431: in build
    yield from self._build()
xorbits/_mars/core/graph/builder/chunk.py:425: in _build
    graph = next(tile_iterator)
xorbits/_mars/services/task/supervisor/preprocessor.py:87: in _iter_with_check
    for chunk_graph in self._iter_without_check():
xorbits/_mars/services/task/supervisor/preprocessor.py:75: in _iter_without_check
    to_update_tileables = self._iter()
xorbits/_mars/core/graph/builder/chunk.py:308: in _iter
    self._tile(
xorbits/_mars/core/graph/builder/chunk.py:201: in _tile
    need_process = next(tile_handler)
xorbits/_mars/core/graph/builder/chunk.py:173: in _tile_handler
    tiled_tileables = yield from handler.tile(tiled_tileables)
xorbits/_mars/core/entity/tileables.py:80: in tile
    tiled_result = yield from tile_handler(op)
xorbits/_mars/dataframe/reduction/core.py:295: in tile
    in_df.agg(
xorbits/_mars/dataframe/reduction/aggregation.py:1047: in aggregate
    if not is_funcs_aggregate(func, func_kw=kw, ndim=df.ndim):
xorbits/_mars/dataframe/reduction/aggregation.py:969: in is_funcs_aggregate
    compiler.add_function(f, 1)
xorbits/_mars/dataframe/reduction/core.py:864: in add_function
    compile_result = self._compile_function(func, func_name, ndim=ndim)
xorbits/_mars/core/mode.py:78: in _inner
    return func(*args, **kwargs)
xorbits/_mars/dataframe/reduction/core.py:931: in _compile_function
    func_ret = self._build_mock_return_object(func, object, ndim=1)
xorbits/_mars/dataframe/reduction/core.py:909: in _build_mock_return_object
    return func(mock_obj)
xorbits/_mars/dataframe/reduction/core.py:675: in __call__
    return build_custom_reduction_result(value, self)
xorbits/_mars/dataframe/reduction/custom_reduction.py:46: in build_custom_reduction_result
    return op(df)
xorbits/_mars/dataframe/reduction/core.py:432: in __call__
    return self._call_series(a)
xorbits/_mars/dataframe/reduction/core.py:410: in _call_series
    result_scalar = getattr(self, "custom_reduction").__call_agg__(empty_series)
xorbits/_mars/dataframe/reduction/core.py:679: in __call_agg__
    r = self.pre(value)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <xorbits._mars.dataframe.reduction.nunique.NuniqueReduction object at 0x7f41cf716b20>
in_data = 1    O1
dtype: object

    def pre(self, in_data):  # noqa: W0221  # pylint: disable=arguments-differ
        xdf = cudf if self.is_gpu() else pd
        if isinstance(in_data, xdf.Series):
            unique_values = in_data.drop_duplicates()
            return xdf.Series(unique_values, name=in_data.name)
        else:
            if self._axis == 0:
                data = dict()
                for d, v in in_data.iteritems():
                    if not self._use_arrow_dtype or xdf is cudf:
>                       data[d] = [v.drop_duplicates().to_list()]
E                       AttributeError: 'str' object has no attribute 'drop_duplicates'

xorbits/_mars/dataframe/reduction/nunique.py:65: AttributeError

And during debug, there is another issue: NuniqueReduction makes all the data turn to CPU(pandas) to be calculated.

ChengjieLi28 avatar Apr 10 '23 09:04 ChengjieLi28