fugue icon indicating copy to clipboard operation
fugue copied to clipboard

[BUG] Issue reading hive partitioned dataset with NativeExecutionEngine

Open LaurentErreca opened this issue 3 years ago • 9 comments

I have a pandas dataframe with a column DAY representing the day number in month (ex : values from 1 to 31 for december)

from datetime import datetime
import pandas as pd

df = pd.DataFrame({'IBES': ['AAPL', 'AAPL', 'IBM', 'IBM'],
                   'EST_MEAN': [12.2, 10.0, 13.1, 13.5],
                   'EST_MEDIAN': [12.2, 12.0, 13.1, 13.2],
                   'BDATE': [datetime(2022, 1, 6), datetime(2022, 1, 7),
                             datetime(2022, 1, 6), datetime(2022, 1, 7)],
                   'DAY': [6, 7, 6, 7]
                  })

I save this dataframe with hive partition DAY

%%fsql

SELECT * FROM df
SAVE PREPARTITION BY DAY OVERWRITE PARQUET output_path

The result folder has a format similar to this: ! tree output_path output_path ├── DAY=6 │ └── 02b4a05c12fa4791aca2931e47659ecc.parquet └── DAY=7 └── bd17a05a5bd948cc824e4730fd03b473.parquet

When I try to read the dataset using spark execution engine, there is no problem

%%fsql spark

df_int = LOAD PARQUET output_path
SELECT * FROM df_int
PRINT df_int

But the same code fails using native execution engine.

The above exception was the direct cause of the following exception:

FugueDataFrameInitError                   Traceback (most recent call last)
/tmp/ipykernel_7378/4230927618.py in <module>
----> 1 get_ipython().run_cell_magic('fsql', '', "\ndf_int = LOAD PARQUET output_path\nSELECT * FROM df_int\nPRINT df_int\n")

~/.conda/envs/fugue/lib/python3.8/site-packages/IPython/core/interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
   2404             with self.builtin_trap:
   2405                 args = (magic_arg_s, cell)
-> 2406                 result = fn(*args, **kwargs)
   2407             return result
   2408 

~/.conda/envs/fugue/lib/python3.8/site-packages/decorator.py in fun(*args, **kw)
    230             if not kwsyntax:
    231                 args, kw = fix(args, kw, sig)
--> 232             return caller(func, *(extras + args), **kw)
    233     fun.__name__ = func.__name__
    234     fun.__doc__ = func.__doc__

~/.conda/envs/fugue/lib/python3.8/site-packages/IPython/core/magic.py in <lambda>(f, *a, **k)
    185     # but it's overkill for just that one bit of state.
    186     def magic_deco(arg):
--> 187         call = lambda f, *a, **k: f(*a, **k)
    188 
    189         if callable(arg):

~/dev/fugue/fugue_notebook/env.py in fsql(self, line, cell, local_ns)
     88         except FugueSQLSyntaxError as ex:
     89             raise FugueSQLSyntaxError(str(ex)).with_traceback(None) from None
---> 90         dag.run(self.get_engine(line, {} if local_ns is None else local_ns))
     91         for k, v in dag.yields.items():
     92             if isinstance(v, YieldedDataFrame):

~/dev/fugue/fugue/workflow/workflow.py in run(self, *args, **kwargs)
   1516                 if ctb is None:  # pragma: no cover
   1517                     raise
-> 1518                 raise ex.with_traceback(ctb)
   1519             self._computed = True
   1520         return DataFrames(

~/dev/fugue/fugue/dataframe/pandas_dataframe.py in __init__(self, df, schema, metadata, pandas_df_wrapper)
     77             self._native = pdf
     78         except Exception as e:
---> 79             raise FugueDataFrameInitError from e
     80 
     81     @property

FugueDataFrameInitError:

I also observed that when you specifiy the list of columns you want to read, and this does not include the partition column, else it works fine:

%%fsql

df_int = LOAD PARQUET output_path COLUMNS IBES,EST_MEDIAN,EST_MEAN,BDATE
SELECT * FROM df_int
PRINT df_int

Environment:

  • Backend: pandas
  • Backend version: 1.3.5
  • Python version: 3.8.12
  • OS: linux

LaurentErreca avatar Jan 09 '22 19:01 LaurentErreca

The problem is due to the dtype of the partition column which is set to pd.CategoricalDtype. By converting the dtype to str, the partitioned dataset is correctly read. The user will have to convert the type of the partition column if necessary.

Dirty hack to test this, in /triad/collections/schema.py, in function append (line232) where obj is a List:

    def append(self, obj: Any) -> "Schema":  # noqa: C901
        """Append schema like object to the current schema. Only new columns
        are allowed.

        :raises SchemaError: if a column exists or is invalid or obj is not convertible
        :return: the Schema object itself
        """
        try:
            if obj is None:
                return self
            elif isinstance(obj, pa.Field):
                self[obj.name] = obj.type
            elif isinstance(obj, str):
                self._append_pa_schema(expression_to_schema(obj))
            elif isinstance(obj, Dict):
                for k, v in obj.items():
                    self[k] = v
            elif isinstance(obj, pa.Schema):
                self._append_pa_schema(obj)
            elif isinstance(obj, pd.DataFrame):
                self._append_pa_schema(PD_UTILS.to_schema(obj))
            elif isinstance(obj, Tuple):  # type: ignore
                self[obj[0]] = obj[1]
            elif isinstance(obj, List):
                import pandas
                for x in obj:
                    if isinstance(x, pandas.core.frame.DataFrame):
                        for col in x.columns:
                            if isinstance(x[col].dtype, pd.CategoricalDtype):
                                x[col] = x[col].astype(str)
                    self.append(x)
            else:
                raise SchemaError(f"Invalid schema to add {obj}")
            return self
        except SchemaError:
            raise
        except Exception as e:
            raise SchemaError(str(e))

LaurentErreca avatar Jan 10 '22 10:01 LaurentErreca

Ah, good catch. So in pyarrow, the dictionary is the categorical type. But the implementation can be very hard. Converting categorical to string may be a more practical way? I am not sure yet, maybe we should spend the effort to support pyarrow dictionary. I will need to think about it.

goodwanghan avatar Jan 10 '22 17:01 goodwanghan

Hi, I'm agree with you, it is better to try to support dictionary first, also, the value type in dictionary seems to be well inferred.

LaurentErreca avatar Jan 10 '22 18:01 LaurentErreca

What makes me think that pyarrow correctly infer types : When I tried with a column of type object (string values) as partition column, the error contains: SchemaError: pyarrow.Field<IBES: dictionary<values=string, indices=int8, ordered=0>> is not supported

Then I also tried with column containing integers: SchemaError: pyarrow.Field<DAY: dictionary<values=int64, indices=int8, ordered=0>> is not supported

LaurentErreca avatar Jan 10 '22 18:01 LaurentErreca

We need to add this from triad And then on Fugue

goodwanghan avatar Jan 10 '22 18:01 goodwanghan

And then we also need to add tons of unit tests and need to make it work for all backends

goodwanghan avatar Jan 10 '22 18:01 goodwanghan

We will try to solve it in https://github.com/fugue-project/fugue/issues/296

goodwanghan avatar Jan 25 '22 06:01 goodwanghan

I think that this issue cannot be closed with #306 because we still can't read hive partitioned dataset with native or Dask execution engine.

LaurentErreca avatar Apr 04 '22 08:04 LaurentErreca

Sorry, let me reopen

goodwanghan avatar Apr 12 '22 06:04 goodwanghan