[BUG] Issue reading hive partitioned dataset with NativeExecutionEngine
I have a pandas dataframe with a column DAY representing the day number in month (ex : values from 1 to 31 for december)
from datetime import datetime
import pandas as pd
df = pd.DataFrame({'IBES': ['AAPL', 'AAPL', 'IBM', 'IBM'],
'EST_MEAN': [12.2, 10.0, 13.1, 13.5],
'EST_MEDIAN': [12.2, 12.0, 13.1, 13.2],
'BDATE': [datetime(2022, 1, 6), datetime(2022, 1, 7),
datetime(2022, 1, 6), datetime(2022, 1, 7)],
'DAY': [6, 7, 6, 7]
})
I save this dataframe with hive partition DAY
%%fsql
SELECT * FROM df
SAVE PREPARTITION BY DAY OVERWRITE PARQUET output_path
The result folder has a format similar to this:
! tree output_path
output_path
├── DAY=6
│ └── 02b4a05c12fa4791aca2931e47659ecc.parquet
└── DAY=7
└── bd17a05a5bd948cc824e4730fd03b473.parquet
When I try to read the dataset using spark execution engine, there is no problem
%%fsql spark
df_int = LOAD PARQUET output_path
SELECT * FROM df_int
PRINT df_int
But the same code fails using native execution engine.
The above exception was the direct cause of the following exception:
FugueDataFrameInitError Traceback (most recent call last)
/tmp/ipykernel_7378/4230927618.py in <module>
----> 1 get_ipython().run_cell_magic('fsql', '', "\ndf_int = LOAD PARQUET output_path\nSELECT * FROM df_int\nPRINT df_int\n")
~/.conda/envs/fugue/lib/python3.8/site-packages/IPython/core/interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
2404 with self.builtin_trap:
2405 args = (magic_arg_s, cell)
-> 2406 result = fn(*args, **kwargs)
2407 return result
2408
~/.conda/envs/fugue/lib/python3.8/site-packages/decorator.py in fun(*args, **kw)
230 if not kwsyntax:
231 args, kw = fix(args, kw, sig)
--> 232 return caller(func, *(extras + args), **kw)
233 fun.__name__ = func.__name__
234 fun.__doc__ = func.__doc__
~/.conda/envs/fugue/lib/python3.8/site-packages/IPython/core/magic.py in <lambda>(f, *a, **k)
185 # but it's overkill for just that one bit of state.
186 def magic_deco(arg):
--> 187 call = lambda f, *a, **k: f(*a, **k)
188
189 if callable(arg):
~/dev/fugue/fugue_notebook/env.py in fsql(self, line, cell, local_ns)
88 except FugueSQLSyntaxError as ex:
89 raise FugueSQLSyntaxError(str(ex)).with_traceback(None) from None
---> 90 dag.run(self.get_engine(line, {} if local_ns is None else local_ns))
91 for k, v in dag.yields.items():
92 if isinstance(v, YieldedDataFrame):
~/dev/fugue/fugue/workflow/workflow.py in run(self, *args, **kwargs)
1516 if ctb is None: # pragma: no cover
1517 raise
-> 1518 raise ex.with_traceback(ctb)
1519 self._computed = True
1520 return DataFrames(
~/dev/fugue/fugue/dataframe/pandas_dataframe.py in __init__(self, df, schema, metadata, pandas_df_wrapper)
77 self._native = pdf
78 except Exception as e:
---> 79 raise FugueDataFrameInitError from e
80
81 @property
FugueDataFrameInitError:
I also observed that when you specifiy the list of columns you want to read, and this does not include the partition column, else it works fine:
%%fsql
df_int = LOAD PARQUET output_path COLUMNS IBES,EST_MEDIAN,EST_MEAN,BDATE
SELECT * FROM df_int
PRINT df_int
Environment:
- Backend: pandas
- Backend version: 1.3.5
- Python version: 3.8.12
- OS: linux
The problem is due to the dtype of the partition column which is set to pd.CategoricalDtype. By converting the dtype to str, the partitioned dataset is correctly read. The user will have to convert the type of the partition column if necessary.
Dirty hack to test this, in /triad/collections/schema.py, in function append (line232) where obj is a List:
def append(self, obj: Any) -> "Schema": # noqa: C901
"""Append schema like object to the current schema. Only new columns
are allowed.
:raises SchemaError: if a column exists or is invalid or obj is not convertible
:return: the Schema object itself
"""
try:
if obj is None:
return self
elif isinstance(obj, pa.Field):
self[obj.name] = obj.type
elif isinstance(obj, str):
self._append_pa_schema(expression_to_schema(obj))
elif isinstance(obj, Dict):
for k, v in obj.items():
self[k] = v
elif isinstance(obj, pa.Schema):
self._append_pa_schema(obj)
elif isinstance(obj, pd.DataFrame):
self._append_pa_schema(PD_UTILS.to_schema(obj))
elif isinstance(obj, Tuple): # type: ignore
self[obj[0]] = obj[1]
elif isinstance(obj, List):
import pandas
for x in obj:
if isinstance(x, pandas.core.frame.DataFrame):
for col in x.columns:
if isinstance(x[col].dtype, pd.CategoricalDtype):
x[col] = x[col].astype(str)
self.append(x)
else:
raise SchemaError(f"Invalid schema to add {obj}")
return self
except SchemaError:
raise
except Exception as e:
raise SchemaError(str(e))
Ah, good catch. So in pyarrow, the dictionary is the categorical type. But the implementation can be very hard. Converting categorical to string may be a more practical way? I am not sure yet, maybe we should spend the effort to support pyarrow dictionary. I will need to think about it.
Hi, I'm agree with you, it is better to try to support dictionary first, also, the value type in dictionary seems to be well inferred.
What makes me think that pyarrow correctly infer types :
When I tried with a column of type object (string values) as partition column, the error contains:
SchemaError: pyarrow.Field<IBES: dictionary<values=string, indices=int8, ordered=0>> is not supported
Then I also tried with column containing integers:
SchemaError: pyarrow.Field<DAY: dictionary<values=int64, indices=int8, ordered=0>> is not supported
We need to add this from triad And then on Fugue
And then we also need to add tons of unit tests and need to make it work for all backends
We will try to solve it in https://github.com/fugue-project/fugue/issues/296
I think that this issue cannot be closed with #306 because we still can't read hive partitioned dataset with native or Dask execution engine.
Sorry, let me reopen