pandarallel icon indicating copy to clipboard operation
pandarallel copied to clipboard

Exception in function passed to `parallel_apply` results in obscure `EOFError`

Open codeandfire opened this issue 2 years ago • 3 comments

General

  • Operating System: Arch Linux (Linux kernel version 5.16.16)
  • Python version: 3.10.4
  • Pandas version: 1.4.1
  • Pandarallel version: 1.6.1

Acknowledgement

  • My issue is NOT present when using pandas alone (without pandarallel)

Bug description

Observed behavior

An exception in the function passed to parallel_apply results in an obscure error message

EOFError: Ran out of input

without any stack trace of the original exception.

Expected behavior

Probably a better error message, containing the stack trace of the original exception?

Minimal but working code sample to ease bug fix for pandarallel team

import pandas as pd
from pandarallel import pandarallel

data = pd.Series(list(range(1, 100000)))

pandarallel.initialize(progress_bar=True)
# INFO: Pandarallel will run on 8 workers.
# INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.

data.parallel_apply(lambda a: a / 0)

Currently the stack trace and error message is:

   0.00%                                          |        0 /    12500 |                                                                                                                    
   0.00%                                          |        0 /    12500 |                                                                                                                    
   0.00%                                          |        0 /    12500 |                                                                                                                    
   0.00%                                          |        0 /    12500 |                                                                                                                    
   0.00%                                          |        0 /    12500 |                                                                                                                    
   0.00%                                          |        0 /    12500 |                                                                                                                    
   0.00%                                          |        0 /    12500 |                                                                                                                    
   0.00%                                          |        0 /    12499 |                                                                                                                    ---------------------------------------------------------------------------
EOFError                                  Traceback (most recent call last)
Input In [5], in <cell line: 1>()
----> 1 data.parallel_apply(lambda a: a / 0)

File .../lib/python3.10/site-packages/pandarallel/core.py:324, in parallelize_with_memory_file_system.<locals>.closure(data, user_defined_function, *user_defined_function_args, **user_defined_function_kwargs)
    321             progress_bars.set_error(worker_index)
    322             progress_bars.update(progresses)
--> 324     return wrapped_reduce_function(
    325         (Path(output_file.name) for output_file in output_files),
    326         reduce_extra,
    327     )
    329 finally:
    330     for output_file in output_files:
    331         # When pandarallel stop supporting Python 3.7 and older, replace this
    332         # try/except clause by:
    333         # Path(output_file.name).unlink(missing_ok=True)

File .../lib/python3.10/site-packages/pandarallel/core.py:199, in wrap_reduce_function_for_file_system.<locals>.closure(output_file_paths, extra)
    192     return data
    194 dfs = (
    195     get_dataframe_and_delete_file(output_file_path)
    196     for output_file_path in output_file_paths
    197 )
--> 199 return reduce_function(dfs, extra)

File .../lib/python3.10/site-packages/pandarallel/data_types/series.py:34, in Series.Apply.reduce(datas, extra)
     32 @staticmethod
     33 def reduce(datas: Iterable[pd.Series], extra: Dict[str, Any]) -> pd.Series:
---> 34     return pd.concat(datas, copy=False)

File /usr/lib/python3.10/site-packages/pandas/util/_decorators.py:311, in deprecate_nonkeyword_arguments.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
    305 if len(args) > num_allow_args:
    306     warnings.warn(
    307         msg.format(arguments=arguments),
    308         FutureWarning,
    309         stacklevel=stacklevel,
    310     )
--> 311 return func(*args, **kwargs)

File /usr/lib/python3.10/site-packages/pandas/core/reshape/concat.py:346, in concat(objs, axis, join, ignore_index, keys, levels, names, verify_integrity, sort, copy)
    142 @deprecate_nonkeyword_arguments(version=None, allowed_args=["objs"])
    143 def concat(
    144     objs: Iterable[NDFrame] | Mapping[Hashable, NDFrame],
   (...)
    153     copy: bool = True,
    154 ) -> DataFrame | Series:
    155     """
    156     Concatenate pandas objects along a particular axis with optional set logic
    157     along the other axes.
   (...)
    344     ValueError: Indexes have overlapping values: ['a']
    345     """
--> 346     op = _Concatenator(
    347         objs,
    348         axis=axis,
    349         ignore_index=ignore_index,
    350         join=join,
    351         keys=keys,
    352         levels=levels,
    353         names=names,
    354         verify_integrity=verify_integrity,
    355         copy=copy,
    356         sort=sort,
    357     )
    359     return op.get_result()

File /usr/lib/python3.10/site-packages/pandas/core/reshape/concat.py:400, in _Concatenator.__init__(self, objs, axis, join, keys, levels, names, ignore_index, verify_integrity, copy, sort)
    398     objs = [objs[k] for k in keys]
    399 else:
--> 400     objs = list(objs)
    402 if len(objs) == 0:
    403     raise ValueError("No objects to concatenate")

File .../lib/python3.10/site-packages/pandarallel/core.py:195, in <genexpr>(.0)
    191     file_path.unlink()
    192     return data
    194 dfs = (
--> 195     get_dataframe_and_delete_file(output_file_path)
    196     for output_file_path in output_file_paths
    197 )
    199 return reduce_function(dfs, extra)

File .../lib/python3.10/site-packages/pandarallel/core.py:189, in wrap_reduce_function_for_file_system.<locals>.closure.<locals>.get_dataframe_and_delete_file(file_path)
    187 def get_dataframe_and_delete_file(file_path: Path) -> Any:
    188     with file_path.open("rb") as file_descriptor:
--> 189         data = pickle.load(file_descriptor)
    191     file_path.unlink()
    192     return data

EOFError: Ran out of input

Using pandas's apply instead of parallel_apply, the stack trace and error message is much more helpful:

In [6]: data.apply(lambda a: a / 0)
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
Input In [6], in <cell line: 1>()
----> 1 data.apply(lambda a: a / 0)

File /usr/lib/python3.10/site-packages/pandas/core/series.py:4433, in Series.apply(self, func, convert_dtype, args, **kwargs)
   4323 def apply(
   4324     self,
   4325     func: AggFuncType,
   (...)
   4328     **kwargs,
   4329 ) -> DataFrame | Series:
   4330     """
   4331     Invoke function on values of Series.
   4332 
   (...)
   4431     dtype: float64
   4432     """
-> 4433     return SeriesApply(self, func, convert_dtype, args, kwargs).apply()

File /usr/lib/python3.10/site-packages/pandas/core/apply.py:1082, in SeriesApply.apply(self)
   1078 if isinstance(self.f, str):
   1079     # if we are a string, try to dispatch
   1080     return self.apply_str()
-> 1082 return self.apply_standard()

File /usr/lib/python3.10/site-packages/pandas/core/apply.py:1137, in SeriesApply.apply_standard(self)
   1131         values = obj.astype(object)._values
   1132         # error: Argument 2 to "map_infer" has incompatible type
   1133         # "Union[Callable[..., Any], str, List[Union[Callable[..., Any], str]],
   1134         # Dict[Hashable, Union[Union[Callable[..., Any], str],
   1135         # List[Union[Callable[..., Any], str]]]]]"; expected
   1136         # "Callable[[Any], Any]"
-> 1137         mapped = lib.map_infer(
   1138             values,
   1139             f,  # type: ignore[arg-type]
   1140             convert=self.convert_dtype,
   1141         )
   1143 if len(mapped) and isinstance(mapped[0], ABCSeries):
   1144     # GH#43986 Need to do list(mapped) in order to get treated as nested
   1145     #  See also GH#25959 regarding EA support
   1146     return obj._constructor_expanddim(list(mapped), index=obj.index)

File /usr/lib/python3.10/site-packages/pandas/_libs/lib.pyx:2870, in pandas._libs.lib.map_infer()

Input In [6], in <lambda>(a)
----> 1 data.apply(lambda a: a / 0)

ZeroDivisionError: division by zero

Thanks!

codeandfire avatar Apr 06 '22 14:04 codeandfire

Any resolution for this? Getting the same error

vikas-kph avatar Apr 10 '22 02:04 vikas-kph

Any resolution for this? Getting the same error

me too!

ruizgo avatar May 02 '22 10:05 ruizgo

Switching to pandarallel.initialize(use_memory_fs=False) is a suitable workaround for me.

pkel avatar Jul 27 '22 09:07 pkel