mars icon indicating copy to clipboard operation
mars copied to clipboard

[BUG] `mars.tensor.array_equal` raises error when input tensor's dtype is string

Open qinxuye opened this issue 2 years ago • 0 comments

Describe the bug

mars.tensor.array_equal raises error when input tensor's dtype is string.

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version
  2. The version of Mars you use
  3. Versions of crucial packages, such as numpy, scipy and pandas
  4. Full stack of the error.
  5. Minimized code to reproduce the error.
In [12]: import mars

In [13]: import mars.tensor as mt

In [14]: mars.new_session()
Web service started at http://0.0.0.0:58497
Out[14]: <mars.deploy.oscar.session.SyncSession at 0x7ff688137880>

In [15]: a = mt.array(['a', 'b', 'c'])

In [16]: b = mt.array(['a', 'b', 'c'])

In [17]: mt.array_equal(a, b).execute()
  0%|                                                   | 0/100 [00:00<?, ?it/s]Failed to run subtask G2H9d9WnwZFwUMDFKUweND1j on band numa-0
Traceback (most recent call last):
  File "/Users/qinxuye/Workspace/mars/mars/services/scheduling/worker/execution.py", line 332, in internal_run_subtask
    subtask_info.result = await self._retry_run_subtask(
  File "/Users/qinxuye/Workspace/mars/mars/services/scheduling/worker/execution.py", line 433, in _retry_run_subtask
    return await _retry_run(subtask, subtask_info, _run_subtask_once)
  File "/Users/qinxuye/Workspace/mars/mars/services/scheduling/worker/execution.py", line 107, in _retry_run
    raise ex
  File "/Users/qinxuye/Workspace/mars/mars/services/scheduling/worker/execution.py", line 67, in _retry_run
    return await target_async_func(*args)
  File "/Users/qinxuye/Workspace/mars/mars/services/scheduling/worker/execution.py", line 375, in _run_subtask_once
    return await asyncio.shield(aiotask)
  File "/Users/qinxuye/Workspace/mars/mars/services/subtask/api.py", line 68, in run_subtask_in_slot
    return await ref.run_subtask.options(profiling_context=profiling_context).send(
  File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/context.py", line 189, in send
    return self._process_result_message(result)
  File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/context.py", line 70, in _process_result_message
    raise message.as_instanceof_cause()
  File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/pool.py", line 542, in send
    result = await self._run_coro(message.message_id, coro)
  File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/pool.py", line 333, in _run_coro
    return await coro
  File "/Users/qinxuye/Workspace/mars/mars/oscar/api.py", line 115, in __on_receive__
    return await super().__on_receive__(message)
  File "mars/oscar/core.pyx", line 506, in __on_receive__
    raise ex
  File "mars/oscar/core.pyx", line 500, in mars.oscar.core._BaseActor.__on_receive__
    return await self._handle_actor_result(result)
  File "mars/oscar/core.pyx", line 385, in _handle_actor_result
    task_result = await coros[0]
  File "mars/oscar/core.pyx", line 428, in _run_actor_async_generator
    async with self._lock:
  File "mars/oscar/core.pyx", line 430, in mars.oscar.core._BaseActor._run_actor_async_generator
    'async_generator %r hold lock timeout', gen):
  File "mars/oscar/core.pyx", line 434, in mars.oscar.core._BaseActor._run_actor_async_generator
    res = await gen.athrow(*res)
  File "/Users/qinxuye/Workspace/mars/mars/services/subtask/worker/runner.py", line 118, in run_subtask
    result = yield self._running_processor.run(subtask)
  File "mars/oscar/core.pyx", line 439, in mars.oscar.core._BaseActor._run_actor_async_generator
    res = await self._handle_actor_result(res)
  File "mars/oscar/core.pyx", line 359, in _handle_actor_result
    result = await result
  File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/context.py", line 189, in send
    return self._process_result_message(result)
  File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/context.py", line 70, in _process_result_message
    raise message.as_instanceof_cause()
  File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/pool.py", line 542, in send
    result = await self._run_coro(message.message_id, coro)
  File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/pool.py", line 333, in _run_coro
    return await coro
  File "/Users/qinxuye/Workspace/mars/mars/oscar/api.py", line 115, in __on_receive__
    return await super().__on_receive__(message)
  File "mars/oscar/core.pyx", line 506, in __on_receive__
    raise ex
  File "mars/oscar/core.pyx", line 500, in mars.oscar.core._BaseActor.__on_receive__
    return await self._handle_actor_result(result)
  File "mars/oscar/core.pyx", line 385, in _handle_actor_result
    task_result = await coros[0]
  File "mars/oscar/core.pyx", line 428, in _run_actor_async_generator
    async with self._lock:
  File "mars/oscar/core.pyx", line 430, in mars.oscar.core._BaseActor._run_actor_async_generator
    'async_generator %r hold lock timeout', gen):
  File "mars/oscar/core.pyx", line 434, in mars.oscar.core._BaseActor._run_actor_async_generator
    res = await gen.athrow(*res)
  File "/Users/qinxuye/Workspace/mars/mars/services/subtask/worker/processor.py", line 610, in run
    result = yield self._running_aio_task
  File "mars/oscar/core.pyx", line 439, in mars.oscar.core._BaseActor._run_actor_async_generator
    res = await self._handle_actor_result(res)
  File "mars/oscar/core.pyx", line 359, in _handle_actor_result
    result = await result
  File "/Users/qinxuye/Workspace/mars/mars/services/subtask/worker/processor.py", line 457, in run
    await self._execute_graph(chunk_graph)
  File "/Users/qinxuye/Workspace/mars/mars/services/subtask/worker/processor.py", line 220, in _execute_graph
    await to_wait
  File "/Users/qinxuye/Workspace/mars/mars/lib/aio/_threads.py", line 36, in to_thread
    return await loop.run_in_executor(None, func_call)
  File "/Users/qinxuye/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/qinxuye/Workspace/mars/mars/core/mode.py", line 77, in _inner
    return func(*args, **kwargs)
  File "/Users/qinxuye/Workspace/mars/mars/services/subtask/worker/processor.py", line 188, in _execute_operand
    return execute(ctx, op)
  File "/Users/qinxuye/Workspace/mars/mars/core/operand/core.py", line 489, in execute
    raise TypeError(str(e)).with_traceback(sys.exc_info()[2]) from None
  File "/Users/qinxuye/Workspace/mars/mars/core/operand/core.py", line 485, in execute
    result = executor(results, op)
  File "/Users/qinxuye/Workspace/mars/mars/tensor/arithmetic/core.py", line 165, in execute
    ret = cls._execute_cpu(op, xp, lhs, rhs, **kw)
  File "/Users/qinxuye/Workspace/mars/mars/tensor/arithmetic/core.py", line 142, in _execute_cpu
    return cls._get_func(xp)(lhs, rhs, **kw)
TypeError: [address=127.0.0.1:41564, pid=5771] ufunc 'equal' did not contain a loop with signature matching types (dtype('<U1'), dtype('<U1')) -> dtype('bool')
100%|█████████████████████████████████████| 100.0/100 [00:00<00:00, 1990.45it/s]
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-17-629343032f46> in <module>
----> 1 mt.array_equal(a, b).execute()

~/Workspace/mars/mars/core/entity/tileables.py in execute(self, session, **kw)
    460 
    461     def execute(self, session=None, **kw):
--> 462         result = self.data.execute(session=session, **kw)
    463         if isinstance(result, TILEABLE_TYPE):
    464             return self

~/Workspace/mars/mars/core/entity/executable.py in execute(self, session, **kw)
     96 
     97         session = _get_session(self, session)
---> 98         return execute(self, session=session, **kw)
     99 
    100     def _check_session(self, session: SessionType, action: str):

~/Workspace/mars/mars/deploy/oscar/session.py in execute(tileable, session, wait, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs)
   1801         session = get_default_or_create(**(new_session_kwargs or dict()))
   1802     session = _ensure_sync(session)
-> 1803     return session.execute(
   1804         tileable,
   1805         *tileables,

~/Workspace/mars/mars/deploy/oscar/session.py in execute(self, tileable, show_progress, *tileables, **kwargs)
   1599         fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
   1600         try:
-> 1601             execution_info: ExecutionInfo = fut.result(
   1602                 timeout=self._isolated_session.timeout
   1603             )

~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    437                 raise CancelledError()
    438             elif self._state == FINISHED:
--> 439                 return self.__get_result()
    440             else:
    441                 raise TimeoutError()

~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/Workspace/mars/mars/deploy/oscar/session.py in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs)
   1781             # set cancelled to avoid wait task leak
   1782             cancelled.set()
-> 1783         await execution_info
   1784     else:
   1785         return execution_info

~/Workspace/mars/mars/deploy/oscar/session.py in wait()
    102 
    103             async def wait():
--> 104                 return await self._aio_task
    105 
    106             self._future_local.future = fut = asyncio.run_coroutine_threadsafe(

~/Workspace/mars/mars/deploy/oscar/session.py in _run_in_background(self, tileables, task_id, progress, profiling)
    916                     )
    917                 if task_result.error:
--> 918                     raise task_result.error.with_traceback(task_result.traceback)
    919             if cancelled:
    920                 return

~/Workspace/mars/mars/services/scheduling/worker/execution.py in internal_run_subtask(self, subtask, band_name)
    330 
    331             batch_quota_req = {(subtask.session_id, subtask.subtask_id): calc_size}
--> 332             subtask_info.result = await self._retry_run_subtask(
    333                 subtask, band_name, subtask_api, batch_quota_req
    334             )

~/Workspace/mars/mars/services/scheduling/worker/execution.py in _retry_run_subtask(self, subtask, band_name, subtask_api, batch_quota_req)
    431         #  any exceptions occurred.
    432         if subtask.retryable:
--> 433             return await _retry_run(subtask, subtask_info, _run_subtask_once)
    434         else:
    435             try:

~/Workspace/mars/mars/services/scheduling/worker/execution.py in _retry_run(subtask, subtask_info, target_async_func, *args)
    105                 )
    106             else:
--> 107                 raise ex
    108 
    109 

~/Workspace/mars/mars/services/scheduling/worker/execution.py in _retry_run(subtask, subtask_info, target_async_func, *args)
     65     while True:
     66         try:
---> 67             return await target_async_func(*args)
     68         except (OSError, MarsError) as ex:
     69             if subtask_info.num_retries < subtask_info.max_retries:

~/Workspace/mars/mars/services/scheduling/worker/execution.py in _run_subtask_once()
    373                     subtask_api.run_subtask_in_slot(band_name, slot_id, subtask)
    374                 )
--> 375                 return await asyncio.shield(aiotask)
    376             except asyncio.CancelledError as ex:
    377                 try:

~/Workspace/mars/mars/services/subtask/api.py in run_subtask_in_slot(self, band_name, slot_id, subtask)
     66             ProfilingContext(task_id=subtask.task_id) if enable_profiling else None
     67         )
---> 68         return await ref.run_subtask.options(profiling_context=profiling_context).send(
     69             subtask
     70         )

~/Workspace/mars/mars/oscar/backends/context.py in send(self, actor_ref, message, wait_response, profiling_context)
    187             if wait_response:
    188                 result = await self._wait(future, actor_ref.address, message)
--> 189                 return self._process_result_message(result)
    190             else:
    191                 return future

~/Workspace/mars/mars/oscar/backends/context.py in _process_result_message(message)
     68             return message.result
     69         else:
---> 70             raise message.as_instanceof_cause()
     71 
     72     async def _wait(self, future: asyncio.Future, address: str, message: _MessageBase):

~/Workspace/mars/mars/oscar/backends/pool.py in send()
    540                 raise ActorNotExist(f"Actor {actor_id} does not exist")
    541             coro = self._actors[actor_id].__on_receive__(message.content)
--> 542             result = await self._run_coro(message.message_id, coro)
    543             processor.result = ResultMessage(
    544                 message.message_id,

~/Workspace/mars/mars/oscar/backends/pool.py in _run_coro()
    331         self._process_messages[message_id] = asyncio.tasks.current_task()
    332         try:
--> 333             return await coro
    334         finally:
    335             self._process_messages.pop(message_id, None)

~/Workspace/mars/mars/oscar/api.py in __on_receive__()
    113             Message shall be (method_name,) + args + (kwargs,)
    114         """
--> 115         return await super().__on_receive__(message)
    116 
    117 

~/Workspace/mars/mars/oscar/core.pyx in __on_receive__()
    504                 debug_logger.exception('Got unhandled error when handling message %r '
    505                                        'in actor %s at %s', message, self.uid, self.address)
--> 506             raise ex
    507 
    508 

~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor.__on_receive__()
    498                 raise ValueError(f'call_method {call_method} not valid')
    499 
--> 500             return await self._handle_actor_result(result)
    501         except Exception as ex:
    502             if _log_unhandled_errors:

~/Workspace/mars/mars/oscar/core.pyx in _handle_actor_result()
    383             # asyncio.wait as it introduces much overhead
    384             if len(coros) == 1:
--> 385                 task_result = await coros[0]
    386                 if extract_tuple:
    387                     result = task_result

~/Workspace/mars/mars/oscar/core.pyx in _run_actor_async_generator()
    426             res = None
    427             while True:
--> 428                 async with self._lock:
    429                     with debug_async_timeout('actor_lock_timeout',
    430                                              'async_generator %r hold lock timeout', gen):

~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
    428                 async with self._lock:
    429                     with debug_async_timeout('actor_lock_timeout',
--> 430                                              'async_generator %r hold lock timeout', gen):
    431                         if not is_exception:
    432                             res = await gen.asend(res)

~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
    432                             res = await gen.asend(res)
    433                         else:
--> 434                             res = await gen.athrow(*res)
    435                 try:
    436                     if _log_cycle_send:

~/Workspace/mars/mars/services/subtask/worker/runner.py in run_subtask()
    116         try:
    117             self._running_processor = self._last_processor = processor
--> 118             result = yield self._running_processor.run(subtask)
    119         finally:
    120             self._running_processor = None

~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
    437                         message_trace = pop_message_trace()
    438 
--> 439                     res = await self._handle_actor_result(res)
    440                     is_exception = False
    441                 except:

~/Workspace/mars/mars/oscar/core.pyx in _handle_actor_result()
    357 
    358         if inspect.isawaitable(result):
--> 359             result = await result
    360         elif is_async_generator(result):
    361             result = (result,)

~/Workspace/mars/mars/oscar/backends/context.py in send()
    187             if wait_response:
    188                 result = await self._wait(future, actor_ref.address, message)
--> 189                 return self._process_result_message(result)
    190             else:
    191                 return future

~/Workspace/mars/mars/oscar/backends/context.py in _process_result_message()
     68             return message.result
     69         else:
---> 70             raise message.as_instanceof_cause()
     71 
     72     async def _wait(self, future: asyncio.Future, address: str, message: _MessageBase):

~/Workspace/mars/mars/oscar/backends/pool.py in send()
    540                 raise ActorNotExist(f"Actor {actor_id} does not exist")
    541             coro = self._actors[actor_id].__on_receive__(message.content)
--> 542             result = await self._run_coro(message.message_id, coro)
    543             processor.result = ResultMessage(
    544                 message.message_id,

~/Workspace/mars/mars/oscar/backends/pool.py in _run_coro()
    331         self._process_messages[message_id] = asyncio.tasks.current_task()
    332         try:
--> 333             return await coro
    334         finally:
    335             self._process_messages.pop(message_id, None)

~/Workspace/mars/mars/oscar/api.py in __on_receive__()
    113             Message shall be (method_name,) + args + (kwargs,)
    114         """
--> 115         return await super().__on_receive__(message)
    116 
    117 

~/Workspace/mars/mars/oscar/core.pyx in __on_receive__()
    504                 debug_logger.exception('Got unhandled error when handling message %r '
    505                                        'in actor %s at %s', message, self.uid, self.address)
--> 506             raise ex
    507 
    508 

~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor.__on_receive__()
    498                 raise ValueError(f'call_method {call_method} not valid')
    499 
--> 500             return await self._handle_actor_result(result)
    501         except Exception as ex:
    502             if _log_unhandled_errors:

~/Workspace/mars/mars/oscar/core.pyx in _handle_actor_result()
    383             # asyncio.wait as it introduces much overhead
    384             if len(coros) == 1:
--> 385                 task_result = await coros[0]
    386                 if extract_tuple:
    387                     result = task_result

~/Workspace/mars/mars/oscar/core.pyx in _run_actor_async_generator()
    426             res = None
    427             while True:
--> 428                 async with self._lock:
    429                     with debug_async_timeout('actor_lock_timeout',
    430                                              'async_generator %r hold lock timeout', gen):

~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
    428                 async with self._lock:
    429                     with debug_async_timeout('actor_lock_timeout',
--> 430                                              'async_generator %r hold lock timeout', gen):
    431                         if not is_exception:
    432                             res = await gen.asend(res)

~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
    432                             res = await gen.asend(res)
    433                         else:
--> 434                             res = await gen.athrow(*res)
    435                 try:
    436                     if _log_cycle_send:

~/Workspace/mars/mars/services/subtask/worker/processor.py in run()
    608         self._running_aio_task = asyncio.create_task(processor.run())
    609         try:
--> 610             result = yield self._running_aio_task
    611             logger.info("Finished subtask: %s", subtask.subtask_id)
    612             raise mo.Return(result)

~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
    437                         message_trace = pop_message_trace()
    438 
--> 439                     res = await self._handle_actor_result(res)
    440                     is_exception = False
    441                 except:

~/Workspace/mars/mars/oscar/core.pyx in _handle_actor_result()
    357 
    358         if inspect.isawaitable(result):
--> 359             result = await result
    360         elif is_async_generator(result):
    361             result = (result,)

~/Workspace/mars/mars/services/subtask/worker/processor.py in run()
    455             try:
    456                 # execute chunk graph
--> 457                 await self._execute_graph(chunk_graph)
    458             finally:
    459                 # unpin inputs data

~/Workspace/mars/mars/services/subtask/worker/processor.py in _execute_graph()
    218 
    219                 try:
--> 220                     await to_wait
    221                     logger.debug(
    222                         "Finish executing operand: %s," "chunk: %s, subtask id: %s",

~/Workspace/mars/mars/lib/aio/_threads.py in to_thread()
     34     ctx = contextvars.copy_context()
     35     func_call = functools.partial(ctx.run, func, *args, **kwargs)
---> 36     return await loop.run_in_executor(None, func_call)

~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/thread.py in run()
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

~/Workspace/mars/mars/core/mode.py in _inner()
     75             def _inner(*args, **kwargs):
     76                 with enter_mode(**mode_name_to_value):
---> 77                     return func(*args, **kwargs)
     78 
     79         else:

~/Workspace/mars/mars/services/subtask/worker/processor.py in _execute_operand()
    186         self, ctx: Dict[str, Any], op: OperandType
    187     ):  # noqa: R0201  # pylint: disable=no-self-use
--> 188         return execute(ctx, op)
    189 
    190     async def _execute_graph(self, chunk_graph: ChunkGraph):

~/Workspace/mars/mars/core/operand/core.py in execute()
    487                 return result
    488             except UFuncTypeError as e:  # pragma: no cover
--> 489                 raise TypeError(str(e)).with_traceback(sys.exc_info()[2]) from None
    490     except NotImplementedError:
    491         for op_cls in type(op).__mro__:

~/Workspace/mars/mars/core/operand/core.py in execute()
    483             # The `UFuncTypeError` was introduced by numpy#12593 since v1.17.0.
    484             try:
--> 485                 result = executor(results, op)
    486                 succeeded = True
    487                 return result

~/Workspace/mars/mars/tensor/arithmetic/core.py in execute()
    163                     ret = cls._execute_gpu(op, xp, lhs, rhs, **kw)
    164                 else:
--> 165                     ret = cls._execute_cpu(op, xp, lhs, rhs, **kw)
    166                 ctx[op.outputs[0].key] = _handle_out_dtype(ret, op.dtype)
    167 

~/Workspace/mars/mars/tensor/arithmetic/core.py in _execute_cpu()
    140         if kw.get("out") is not None:
    141             kw["out"] = np.asarray(kw["out"])
--> 142         return cls._get_func(xp)(lhs, rhs, **kw)
    143 
    144     @classmethod

TypeError: [address=127.0.0.1:41564, pid=5771] ufunc 'equal' did not contain a loop with signature matching types (dtype('<U1'), dtype('<U1')) -> dtype('bool')

The reason is that we hand over a == b to mt.equal(a, b), but for numpy, np.equal cannot handle string dtype, but == can.

In [6]: a = np.array(['a', 'b', 'c'])

In [7]: b = np.array(['a', 'b', 'c'])

In [8]: a == b
Out[8]: array([ True,  True,  True])

In [9]: np.equal(a, b)
---------------------------------------------------------------------------
UFuncTypeError                            Traceback (most recent call last)
<ipython-input-9-76fd4efa5344> in <module>
----> 1 np.equal(a, b)

UFuncTypeError: ufunc 'equal' did not contain a loop with signature matching types (dtype('<U1'), dtype('<U1')) -> dtype('bool')

qinxuye avatar Mar 17 '22 07:03 qinxuye