mars
mars copied to clipboard
[BUG] `mars.tensor.array_equal` raises error when input tensor's dtype is string
Describe the bug
mars.tensor.array_equal
raises error when input tensor's dtype is string.
To Reproduce To help us reproducing this bug, please provide information below:
- Your Python version
- The version of Mars you use
- Versions of crucial packages, such as numpy, scipy and pandas
- Full stack of the error.
- Minimized code to reproduce the error.
In [12]: import mars
In [13]: import mars.tensor as mt
In [14]: mars.new_session()
Web service started at http://0.0.0.0:58497
Out[14]: <mars.deploy.oscar.session.SyncSession at 0x7ff688137880>
In [15]: a = mt.array(['a', 'b', 'c'])
In [16]: b = mt.array(['a', 'b', 'c'])
In [17]: mt.array_equal(a, b).execute()
0%| | 0/100 [00:00<?, ?it/s]Failed to run subtask G2H9d9WnwZFwUMDFKUweND1j on band numa-0
Traceback (most recent call last):
File "/Users/qinxuye/Workspace/mars/mars/services/scheduling/worker/execution.py", line 332, in internal_run_subtask
subtask_info.result = await self._retry_run_subtask(
File "/Users/qinxuye/Workspace/mars/mars/services/scheduling/worker/execution.py", line 433, in _retry_run_subtask
return await _retry_run(subtask, subtask_info, _run_subtask_once)
File "/Users/qinxuye/Workspace/mars/mars/services/scheduling/worker/execution.py", line 107, in _retry_run
raise ex
File "/Users/qinxuye/Workspace/mars/mars/services/scheduling/worker/execution.py", line 67, in _retry_run
return await target_async_func(*args)
File "/Users/qinxuye/Workspace/mars/mars/services/scheduling/worker/execution.py", line 375, in _run_subtask_once
return await asyncio.shield(aiotask)
File "/Users/qinxuye/Workspace/mars/mars/services/subtask/api.py", line 68, in run_subtask_in_slot
return await ref.run_subtask.options(profiling_context=profiling_context).send(
File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/context.py", line 189, in send
return self._process_result_message(result)
File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/context.py", line 70, in _process_result_message
raise message.as_instanceof_cause()
File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/pool.py", line 542, in send
result = await self._run_coro(message.message_id, coro)
File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/pool.py", line 333, in _run_coro
return await coro
File "/Users/qinxuye/Workspace/mars/mars/oscar/api.py", line 115, in __on_receive__
return await super().__on_receive__(message)
File "mars/oscar/core.pyx", line 506, in __on_receive__
raise ex
File "mars/oscar/core.pyx", line 500, in mars.oscar.core._BaseActor.__on_receive__
return await self._handle_actor_result(result)
File "mars/oscar/core.pyx", line 385, in _handle_actor_result
task_result = await coros[0]
File "mars/oscar/core.pyx", line 428, in _run_actor_async_generator
async with self._lock:
File "mars/oscar/core.pyx", line 430, in mars.oscar.core._BaseActor._run_actor_async_generator
'async_generator %r hold lock timeout', gen):
File "mars/oscar/core.pyx", line 434, in mars.oscar.core._BaseActor._run_actor_async_generator
res = await gen.athrow(*res)
File "/Users/qinxuye/Workspace/mars/mars/services/subtask/worker/runner.py", line 118, in run_subtask
result = yield self._running_processor.run(subtask)
File "mars/oscar/core.pyx", line 439, in mars.oscar.core._BaseActor._run_actor_async_generator
res = await self._handle_actor_result(res)
File "mars/oscar/core.pyx", line 359, in _handle_actor_result
result = await result
File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/context.py", line 189, in send
return self._process_result_message(result)
File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/context.py", line 70, in _process_result_message
raise message.as_instanceof_cause()
File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/pool.py", line 542, in send
result = await self._run_coro(message.message_id, coro)
File "/Users/qinxuye/Workspace/mars/mars/oscar/backends/pool.py", line 333, in _run_coro
return await coro
File "/Users/qinxuye/Workspace/mars/mars/oscar/api.py", line 115, in __on_receive__
return await super().__on_receive__(message)
File "mars/oscar/core.pyx", line 506, in __on_receive__
raise ex
File "mars/oscar/core.pyx", line 500, in mars.oscar.core._BaseActor.__on_receive__
return await self._handle_actor_result(result)
File "mars/oscar/core.pyx", line 385, in _handle_actor_result
task_result = await coros[0]
File "mars/oscar/core.pyx", line 428, in _run_actor_async_generator
async with self._lock:
File "mars/oscar/core.pyx", line 430, in mars.oscar.core._BaseActor._run_actor_async_generator
'async_generator %r hold lock timeout', gen):
File "mars/oscar/core.pyx", line 434, in mars.oscar.core._BaseActor._run_actor_async_generator
res = await gen.athrow(*res)
File "/Users/qinxuye/Workspace/mars/mars/services/subtask/worker/processor.py", line 610, in run
result = yield self._running_aio_task
File "mars/oscar/core.pyx", line 439, in mars.oscar.core._BaseActor._run_actor_async_generator
res = await self._handle_actor_result(res)
File "mars/oscar/core.pyx", line 359, in _handle_actor_result
result = await result
File "/Users/qinxuye/Workspace/mars/mars/services/subtask/worker/processor.py", line 457, in run
await self._execute_graph(chunk_graph)
File "/Users/qinxuye/Workspace/mars/mars/services/subtask/worker/processor.py", line 220, in _execute_graph
await to_wait
File "/Users/qinxuye/Workspace/mars/mars/lib/aio/_threads.py", line 36, in to_thread
return await loop.run_in_executor(None, func_call)
File "/Users/qinxuye/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/qinxuye/Workspace/mars/mars/core/mode.py", line 77, in _inner
return func(*args, **kwargs)
File "/Users/qinxuye/Workspace/mars/mars/services/subtask/worker/processor.py", line 188, in _execute_operand
return execute(ctx, op)
File "/Users/qinxuye/Workspace/mars/mars/core/operand/core.py", line 489, in execute
raise TypeError(str(e)).with_traceback(sys.exc_info()[2]) from None
File "/Users/qinxuye/Workspace/mars/mars/core/operand/core.py", line 485, in execute
result = executor(results, op)
File "/Users/qinxuye/Workspace/mars/mars/tensor/arithmetic/core.py", line 165, in execute
ret = cls._execute_cpu(op, xp, lhs, rhs, **kw)
File "/Users/qinxuye/Workspace/mars/mars/tensor/arithmetic/core.py", line 142, in _execute_cpu
return cls._get_func(xp)(lhs, rhs, **kw)
TypeError: [address=127.0.0.1:41564, pid=5771] ufunc 'equal' did not contain a loop with signature matching types (dtype('<U1'), dtype('<U1')) -> dtype('bool')
100%|█████████████████████████████████████| 100.0/100 [00:00<00:00, 1990.45it/s]
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-17-629343032f46> in <module>
----> 1 mt.array_equal(a, b).execute()
~/Workspace/mars/mars/core/entity/tileables.py in execute(self, session, **kw)
460
461 def execute(self, session=None, **kw):
--> 462 result = self.data.execute(session=session, **kw)
463 if isinstance(result, TILEABLE_TYPE):
464 return self
~/Workspace/mars/mars/core/entity/executable.py in execute(self, session, **kw)
96
97 session = _get_session(self, session)
---> 98 return execute(self, session=session, **kw)
99
100 def _check_session(self, session: SessionType, action: str):
~/Workspace/mars/mars/deploy/oscar/session.py in execute(tileable, session, wait, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs)
1801 session = get_default_or_create(**(new_session_kwargs or dict()))
1802 session = _ensure_sync(session)
-> 1803 return session.execute(
1804 tileable,
1805 *tileables,
~/Workspace/mars/mars/deploy/oscar/session.py in execute(self, tileable, show_progress, *tileables, **kwargs)
1599 fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
1600 try:
-> 1601 execution_info: ExecutionInfo = fut.result(
1602 timeout=self._isolated_session.timeout
1603 )
~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
437 raise CancelledError()
438 elif self._state == FINISHED:
--> 439 return self.__get_result()
440 else:
441 raise TimeoutError()
~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
386 def __get_result(self):
387 if self._exception:
--> 388 raise self._exception
389 else:
390 return self._result
~/Workspace/mars/mars/deploy/oscar/session.py in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs)
1781 # set cancelled to avoid wait task leak
1782 cancelled.set()
-> 1783 await execution_info
1784 else:
1785 return execution_info
~/Workspace/mars/mars/deploy/oscar/session.py in wait()
102
103 async def wait():
--> 104 return await self._aio_task
105
106 self._future_local.future = fut = asyncio.run_coroutine_threadsafe(
~/Workspace/mars/mars/deploy/oscar/session.py in _run_in_background(self, tileables, task_id, progress, profiling)
916 )
917 if task_result.error:
--> 918 raise task_result.error.with_traceback(task_result.traceback)
919 if cancelled:
920 return
~/Workspace/mars/mars/services/scheduling/worker/execution.py in internal_run_subtask(self, subtask, band_name)
330
331 batch_quota_req = {(subtask.session_id, subtask.subtask_id): calc_size}
--> 332 subtask_info.result = await self._retry_run_subtask(
333 subtask, band_name, subtask_api, batch_quota_req
334 )
~/Workspace/mars/mars/services/scheduling/worker/execution.py in _retry_run_subtask(self, subtask, band_name, subtask_api, batch_quota_req)
431 # any exceptions occurred.
432 if subtask.retryable:
--> 433 return await _retry_run(subtask, subtask_info, _run_subtask_once)
434 else:
435 try:
~/Workspace/mars/mars/services/scheduling/worker/execution.py in _retry_run(subtask, subtask_info, target_async_func, *args)
105 )
106 else:
--> 107 raise ex
108
109
~/Workspace/mars/mars/services/scheduling/worker/execution.py in _retry_run(subtask, subtask_info, target_async_func, *args)
65 while True:
66 try:
---> 67 return await target_async_func(*args)
68 except (OSError, MarsError) as ex:
69 if subtask_info.num_retries < subtask_info.max_retries:
~/Workspace/mars/mars/services/scheduling/worker/execution.py in _run_subtask_once()
373 subtask_api.run_subtask_in_slot(band_name, slot_id, subtask)
374 )
--> 375 return await asyncio.shield(aiotask)
376 except asyncio.CancelledError as ex:
377 try:
~/Workspace/mars/mars/services/subtask/api.py in run_subtask_in_slot(self, band_name, slot_id, subtask)
66 ProfilingContext(task_id=subtask.task_id) if enable_profiling else None
67 )
---> 68 return await ref.run_subtask.options(profiling_context=profiling_context).send(
69 subtask
70 )
~/Workspace/mars/mars/oscar/backends/context.py in send(self, actor_ref, message, wait_response, profiling_context)
187 if wait_response:
188 result = await self._wait(future, actor_ref.address, message)
--> 189 return self._process_result_message(result)
190 else:
191 return future
~/Workspace/mars/mars/oscar/backends/context.py in _process_result_message(message)
68 return message.result
69 else:
---> 70 raise message.as_instanceof_cause()
71
72 async def _wait(self, future: asyncio.Future, address: str, message: _MessageBase):
~/Workspace/mars/mars/oscar/backends/pool.py in send()
540 raise ActorNotExist(f"Actor {actor_id} does not exist")
541 coro = self._actors[actor_id].__on_receive__(message.content)
--> 542 result = await self._run_coro(message.message_id, coro)
543 processor.result = ResultMessage(
544 message.message_id,
~/Workspace/mars/mars/oscar/backends/pool.py in _run_coro()
331 self._process_messages[message_id] = asyncio.tasks.current_task()
332 try:
--> 333 return await coro
334 finally:
335 self._process_messages.pop(message_id, None)
~/Workspace/mars/mars/oscar/api.py in __on_receive__()
113 Message shall be (method_name,) + args + (kwargs,)
114 """
--> 115 return await super().__on_receive__(message)
116
117
~/Workspace/mars/mars/oscar/core.pyx in __on_receive__()
504 debug_logger.exception('Got unhandled error when handling message %r '
505 'in actor %s at %s', message, self.uid, self.address)
--> 506 raise ex
507
508
~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor.__on_receive__()
498 raise ValueError(f'call_method {call_method} not valid')
499
--> 500 return await self._handle_actor_result(result)
501 except Exception as ex:
502 if _log_unhandled_errors:
~/Workspace/mars/mars/oscar/core.pyx in _handle_actor_result()
383 # asyncio.wait as it introduces much overhead
384 if len(coros) == 1:
--> 385 task_result = await coros[0]
386 if extract_tuple:
387 result = task_result
~/Workspace/mars/mars/oscar/core.pyx in _run_actor_async_generator()
426 res = None
427 while True:
--> 428 async with self._lock:
429 with debug_async_timeout('actor_lock_timeout',
430 'async_generator %r hold lock timeout', gen):
~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
428 async with self._lock:
429 with debug_async_timeout('actor_lock_timeout',
--> 430 'async_generator %r hold lock timeout', gen):
431 if not is_exception:
432 res = await gen.asend(res)
~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
432 res = await gen.asend(res)
433 else:
--> 434 res = await gen.athrow(*res)
435 try:
436 if _log_cycle_send:
~/Workspace/mars/mars/services/subtask/worker/runner.py in run_subtask()
116 try:
117 self._running_processor = self._last_processor = processor
--> 118 result = yield self._running_processor.run(subtask)
119 finally:
120 self._running_processor = None
~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
437 message_trace = pop_message_trace()
438
--> 439 res = await self._handle_actor_result(res)
440 is_exception = False
441 except:
~/Workspace/mars/mars/oscar/core.pyx in _handle_actor_result()
357
358 if inspect.isawaitable(result):
--> 359 result = await result
360 elif is_async_generator(result):
361 result = (result,)
~/Workspace/mars/mars/oscar/backends/context.py in send()
187 if wait_response:
188 result = await self._wait(future, actor_ref.address, message)
--> 189 return self._process_result_message(result)
190 else:
191 return future
~/Workspace/mars/mars/oscar/backends/context.py in _process_result_message()
68 return message.result
69 else:
---> 70 raise message.as_instanceof_cause()
71
72 async def _wait(self, future: asyncio.Future, address: str, message: _MessageBase):
~/Workspace/mars/mars/oscar/backends/pool.py in send()
540 raise ActorNotExist(f"Actor {actor_id} does not exist")
541 coro = self._actors[actor_id].__on_receive__(message.content)
--> 542 result = await self._run_coro(message.message_id, coro)
543 processor.result = ResultMessage(
544 message.message_id,
~/Workspace/mars/mars/oscar/backends/pool.py in _run_coro()
331 self._process_messages[message_id] = asyncio.tasks.current_task()
332 try:
--> 333 return await coro
334 finally:
335 self._process_messages.pop(message_id, None)
~/Workspace/mars/mars/oscar/api.py in __on_receive__()
113 Message shall be (method_name,) + args + (kwargs,)
114 """
--> 115 return await super().__on_receive__(message)
116
117
~/Workspace/mars/mars/oscar/core.pyx in __on_receive__()
504 debug_logger.exception('Got unhandled error when handling message %r '
505 'in actor %s at %s', message, self.uid, self.address)
--> 506 raise ex
507
508
~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor.__on_receive__()
498 raise ValueError(f'call_method {call_method} not valid')
499
--> 500 return await self._handle_actor_result(result)
501 except Exception as ex:
502 if _log_unhandled_errors:
~/Workspace/mars/mars/oscar/core.pyx in _handle_actor_result()
383 # asyncio.wait as it introduces much overhead
384 if len(coros) == 1:
--> 385 task_result = await coros[0]
386 if extract_tuple:
387 result = task_result
~/Workspace/mars/mars/oscar/core.pyx in _run_actor_async_generator()
426 res = None
427 while True:
--> 428 async with self._lock:
429 with debug_async_timeout('actor_lock_timeout',
430 'async_generator %r hold lock timeout', gen):
~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
428 async with self._lock:
429 with debug_async_timeout('actor_lock_timeout',
--> 430 'async_generator %r hold lock timeout', gen):
431 if not is_exception:
432 res = await gen.asend(res)
~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
432 res = await gen.asend(res)
433 else:
--> 434 res = await gen.athrow(*res)
435 try:
436 if _log_cycle_send:
~/Workspace/mars/mars/services/subtask/worker/processor.py in run()
608 self._running_aio_task = asyncio.create_task(processor.run())
609 try:
--> 610 result = yield self._running_aio_task
611 logger.info("Finished subtask: %s", subtask.subtask_id)
612 raise mo.Return(result)
~/Workspace/mars/mars/oscar/core.pyx in mars.oscar.core._BaseActor._run_actor_async_generator()
437 message_trace = pop_message_trace()
438
--> 439 res = await self._handle_actor_result(res)
440 is_exception = False
441 except:
~/Workspace/mars/mars/oscar/core.pyx in _handle_actor_result()
357
358 if inspect.isawaitable(result):
--> 359 result = await result
360 elif is_async_generator(result):
361 result = (result,)
~/Workspace/mars/mars/services/subtask/worker/processor.py in run()
455 try:
456 # execute chunk graph
--> 457 await self._execute_graph(chunk_graph)
458 finally:
459 # unpin inputs data
~/Workspace/mars/mars/services/subtask/worker/processor.py in _execute_graph()
218
219 try:
--> 220 await to_wait
221 logger.debug(
222 "Finish executing operand: %s," "chunk: %s, subtask id: %s",
~/Workspace/mars/mars/lib/aio/_threads.py in to_thread()
34 ctx = contextvars.copy_context()
35 func_call = functools.partial(ctx.run, func, *args, **kwargs)
---> 36 return await loop.run_in_executor(None, func_call)
~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/thread.py in run()
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
~/Workspace/mars/mars/core/mode.py in _inner()
75 def _inner(*args, **kwargs):
76 with enter_mode(**mode_name_to_value):
---> 77 return func(*args, **kwargs)
78
79 else:
~/Workspace/mars/mars/services/subtask/worker/processor.py in _execute_operand()
186 self, ctx: Dict[str, Any], op: OperandType
187 ): # noqa: R0201 # pylint: disable=no-self-use
--> 188 return execute(ctx, op)
189
190 async def _execute_graph(self, chunk_graph: ChunkGraph):
~/Workspace/mars/mars/core/operand/core.py in execute()
487 return result
488 except UFuncTypeError as e: # pragma: no cover
--> 489 raise TypeError(str(e)).with_traceback(sys.exc_info()[2]) from None
490 except NotImplementedError:
491 for op_cls in type(op).__mro__:
~/Workspace/mars/mars/core/operand/core.py in execute()
483 # The `UFuncTypeError` was introduced by numpy#12593 since v1.17.0.
484 try:
--> 485 result = executor(results, op)
486 succeeded = True
487 return result
~/Workspace/mars/mars/tensor/arithmetic/core.py in execute()
163 ret = cls._execute_gpu(op, xp, lhs, rhs, **kw)
164 else:
--> 165 ret = cls._execute_cpu(op, xp, lhs, rhs, **kw)
166 ctx[op.outputs[0].key] = _handle_out_dtype(ret, op.dtype)
167
~/Workspace/mars/mars/tensor/arithmetic/core.py in _execute_cpu()
140 if kw.get("out") is not None:
141 kw["out"] = np.asarray(kw["out"])
--> 142 return cls._get_func(xp)(lhs, rhs, **kw)
143
144 @classmethod
TypeError: [address=127.0.0.1:41564, pid=5771] ufunc 'equal' did not contain a loop with signature matching types (dtype('<U1'), dtype('<U1')) -> dtype('bool')
The reason is that we hand over a == b to mt.equal(a, b)
, but for numpy, np.equal cannot handle string dtype, but == can.
In [6]: a = np.array(['a', 'b', 'c'])
In [7]: b = np.array(['a', 'b', 'c'])
In [8]: a == b
Out[8]: array([ True, True, True])
In [9]: np.equal(a, b)
---------------------------------------------------------------------------
UFuncTypeError Traceback (most recent call last)
<ipython-input-9-76fd4efa5344> in <module>
----> 1 np.equal(a, b)
UFuncTypeError: ufunc 'equal' did not contain a loop with signature matching types (dtype('<U1'), dtype('<U1')) -> dtype('bool')