xorbits
xorbits copied to clipboard
BUG: Series with arrow string dtype rechunk failed
Describe the bug
Series with arrow string dtype rechunk failed。
To Reproduce
To help us to reproduce this bug, please provide information below:
- Your Python version
- The version of Xorbits you use
- Versions of crucial packages, such as numpy, scipy and pandas
- Full stack of the error.
- Minimized code to reproduce the error.
In [1]: import xorbits.pandas as pd
In [2]: s = pd.Series(['a', 'b', 'd'], dtype='string[pyarrow]')
In [3]: s
Out[3]:
0 a
1 b
2 d
dtype: string
In [4]: s.rechunk(2)
Out[4]: /Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1924: UserWarning: No existing session found, creating a new local session now.
warnings.warn(warning_msg)
2023-09-18 15:19:13,907 xorbits._mars.deploy.oscar.local 24904 WARNING Web service started at http://0.0.0.0:18711
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100.00/100 [00:00<00:00, 35311.53it/s]
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
File ~/miniconda3/lib/python3.9/site-packages/IPython/core/formatters.py:706, in PlainTextFormatter.__call__(self, obj)
699 stream = StringIO()
700 printer = pretty.RepresentationPrinter(stream, self.verbose,
701 self.max_width, self.newline,
702 max_seq_length=self.max_seq_length,
703 singleton_pprinters=self.singleton_printers,
704 type_pprinters=self.type_printers,
705 deferred_pprinters=self.deferred_printers)
--> 706 printer.pretty(obj)
707 printer.flush()
708 return stream.getvalue()
File ~/miniconda3/lib/python3.9/site-packages/IPython/lib/pretty.py:410, in RepresentationPrinter.pretty(self, obj)
407 return meth(obj, self, cycle)
408 if cls is not object \
409 and callable(cls.__dict__.get('__repr__')):
--> 410 return _repr_pprint(obj, self, cycle)
412 return _default_pprint(obj, self, cycle)
413 finally:
File ~/miniconda3/lib/python3.9/site-packages/IPython/lib/pretty.py:778, in _repr_pprint(obj, p, cycle)
776 """A pprint that just redirects to the normal repr function."""
777 # Find newlines and replace them with p.break_()
--> 778 output = repr(obj)
779 lines = output.splitlines()
780 with p.group():
File ~/Workspace/xorbits/python/xorbits/utils.py:38, in safe_repr_str.<locals>.inn(self, *args, **kwargs)
36 return getattr(object, f.__name__)(self)
37 else:
---> 38 return f(self, *args, **kwargs)
File ~/Workspace/xorbits/python/xorbits/core/data.py:310, in DataRef.__repr__(self)
308 return self.data._mars_entity.op.data.__repr__()
309 else:
--> 310 run(self)
311 return self.data.__repr__()
File ~/Workspace/xorbits/python/xorbits/core/execution.py:55, in run(obj, **kwargs)
53 mars_tileables = [_get_mars_entity(ref) for ref in refs_to_execute.values()]
54 if mars_tileables:
---> 55 mars_execute(mars_tileables, **kwargs)
File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1760, in execute(tileable, session, wait, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs)
1758 session = get_default_or_create(**(new_session_kwargs or dict()))
1759 session = _ensure_sync(session)
-> 1760 return session.execute(
1761 tileable,
1762 *tileables,
1763 wait=wait,
1764 show_progress=show_progress,
1765 progress_update_interval=progress_update_interval,
1766 **kwargs,
1767 )
File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1576, in SyncSession.execute(self, tileable, show_progress, warn_duplicated_execution, *tileables, **kwargs)
1574 fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
1575 try:
-> 1576 execution_info: ExecutionInfo = fut.result(
1577 timeout=self._isolated_session.timeout
1578 )
1579 except KeyboardInterrupt: # pragma: no cover
1580 logger.warning("Cancelling running task")
File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
444 raise CancelledError()
445 elif self._state == FINISHED:
--> 446 return self.__get_result()
447 else:
448 raise TimeoutError()
File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
394 self = None
File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1740, in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs)
1737 else:
1738 # set cancelled to avoid wait task leak
1739 cancelled.set()
-> 1740 await execution_info
1741 else:
1742 return execution_info
File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:124, in ExecutionInfo._ensure_future.<locals>.wait()
123 async def wait():
--> 124 return await self._aio_task
File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:873, in _IsolatedSession._run_in_background(self, tileables, task_id, progress, profiling)
867 logger.warning(
868 "Profile task %s execution result:\n%s",
869 task_id,
870 json.dumps(task_result.profiling, indent=4),
871 )
872 if task_result.error:
--> 873 raise task_result.error.with_traceback(task_result.traceback)
874 if cancelled:
875 return
File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:387, in TaskProcessor.run(self)
382 self._tileable_id_to_tileable = await asyncio.to_thread(
383 self._get_tileable_id_to_tileable, self._preprocessor.tileable_graph
384 )
386 async with self._executor:
--> 387 async for stage_args in self._iter_stage_chunk_graph():
388 await self._process_stage_chunk_graph(*stage_args)
389 await self._task_info_collector.collect_result_nodes(
390 self._task, self._subtask_graphs
391 )
File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:171, in TaskProcessor._iter_stage_chunk_graph(self)
169 with Timer() as stage_timer:
170 with Timer() as timer:
--> 171 chunk_graph = await self._get_next_chunk_graph(chunk_graph_iter)
172 if chunk_graph is None:
173 # tile finished
174 self._preprocessor.done = True
File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:162, in TaskProcessor._get_next_chunk_graph(chunk_graph_iter)
159 return
161 fut = asyncio.to_thread(next_chunk_graph)
--> 162 chunk_graph = await fut
163 return chunk_graph
File ~/miniconda3/lib/python3.9/asyncio/threads.py:25, in to_thread(func, *args, **kwargs)
23 ctx = contextvars.copy_context()
24 func_call = functools.partial(ctx.run, func, *args, **kwargs)
---> 25 return await loop.run_in_executor(None, func_call)
File ~/miniconda3/lib/python3.9/concurrent/futures/thread.py:58, in _WorkItem.run(self)
55 return
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:157, in TaskProcessor._get_next_chunk_graph.<locals>.next_chunk_graph()
155 def next_chunk_graph():
156 try:
--> 157 return next(chunk_graph_iter)
158 except StopIteration:
159 return
File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/preprocessor.py:201, in TaskPreprocessor.tile(self, tileable_graph)
199 if hasattr(t.op, "logic_key") and t.op.logic_key is None:
200 t.op.logic_key = t.op.get_logic_key()
--> 201 for chunk_graph in chunk_graph_builder.build():
202 if len(chunk_graph) == 0:
203 continue
File ~/Workspace/xorbits/python/xorbits/_mars/core/graph/builder/chunk.py:431, in ChunkGraphBuilder.build(self)
430 def build(self) -> Generator[Union[TileableGraph, ChunkGraph], None, None]:
--> 431 yield from self._build()
File ~/Workspace/xorbits/python/xorbits/_mars/core/graph/builder/chunk.py:425, in ChunkGraphBuilder._build(self)
423 try:
424 with enter_mode(build=True, kernel=True):
--> 425 graph = next(tile_iterator)
426 yield graph
427 except StopIteration:
File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/preprocessor.py:75, in CancellableTiler._iter_without_check(self)
73 def _iter_without_check(self):
74 while self._tileable_handlers:
---> 75 to_update_tileables = self._iter()
76 if not self.cancelled:
77 yield self._cur_chunk_graph
File ~/Workspace/xorbits/python/xorbits/_mars/core/graph/builder/chunk.py:308, in Tiler._iter(self)
306 # tile
307 for tile_handler in self._gen_tileable_handlers(next_tileable_handlers):
--> 308 self._tile(
309 chunk_graph,
310 tile_handler.tileable,
311 tile_handler.handler,
312 next_tileable_handlers,
313 to_update_tileables,
314 visited,
315 )
316 self._tileable_handlers = next_tileable_handlers
317 # gen result chunks
File ~/Workspace/xorbits/python/xorbits/_mars/core/graph/builder/chunk.py:201, in Tiler._tile(self, chunk_graph, tileable, tile_handler, next_tileable_handlers, to_update_tileables, visited)
191 def _tile(
192 self,
193 chunk_graph: ChunkGraph,
(...)
198 visited: Set[EntityType],
199 ):
200 try:
--> 201 need_process = next(tile_handler)
203 if isinstance(need_process, TileStatus):
204 # process tile that returns progress
205 self._tile_context.set_progress(tileable, need_process.progress)
File ~/Workspace/xorbits/python/xorbits/_mars/core/graph/builder/chunk.py:173, in Tiler._tile_handler(self, tileable)
171 tiled_tileables = [self._get_data(t) for t in tiled_tileables]
172 # start to tile
--> 173 tiled_tileables = yield from handler.tile(tiled_tileables)
174 return tiled_tileables
File ~/Workspace/xorbits/python/xorbits/_mars/core/entity/tileables.py:80, in OperandTilesHandler.tile(cls, tileables)
74 tile_handler = cls.get_handler(op)
75 if inspect.isgeneratorfunction(tile_handler):
76 # op.tile can be a generator function,
77 # each time an operand yield some chunks,
78 # they will be put into ChunkGraph and executed first.
79 # After execution, resume from the yield place.
---> 80 tiled_result = yield from tile_handler(op)
81 else:
82 # without iterative tiling
83 tiled_result = tile_handler(op)
File ~/Workspace/xorbits/python/xorbits/_mars/dataframe/base/rechunk.py:83, in DataFrameRechunk.tile(cls, op)
81 else:
82 inp = asindex(inp)
---> 83 chunk_size = _get_chunk_size(inp, op.chunk_size)
84 if chunk_size == inp.nsplits:
85 return [inp]
File ~/Workspace/xorbits/python/xorbits/_mars/dataframe/base/rechunk.py:190, in _get_chunk_size(a, chunk_size)
188 itemsize = max(getattr(dt, "itemsize", 8) for dt in a.dtypes)
189 else:
--> 190 itemsize = a.dtype.itemsize
191 return get_nsplits(a, chunk_size, itemsize)
AttributeError: 'StringDtype' object has no attribute 'itemsize'