kedro-plugins
kedro-plugins copied to clipboard
pandas.DeltaTableDataset depends on pyarrow.Table.from_pandas to work properly
Description
I had some problems with pandas.DeltaTableDataset when my nodes were returning a dataframe. Eg: Running the code below results in the error: name 'sepal_width' present in the specified schema is not found in the columns or index" even with the column sepal_width defined as nullable.
from kedro_datasets.pandas import DeltaTableDataset
import pyarrow as pa
dataset = DeltaTableDataset(
filepath='data/01_raw/delta_iris',
save_args={
'mode': 'overwrite',
'schema': pa.schema([
pa.field('sepal_length', pa.float64(), nullable=True),
pa.field('sepal_width', pa.float64(), nullable=True),
pa.field('petal_length', pa.float64(), nullable=True),
pa.field('petal_width', pa.float64(), nullable=True),
pa.field('species', pa.string(), nullable=False)
]),
'overwrite_schema': True
}
)
dataset.save(iris.drop(columns=['sepal_width']))
I also had some problems related with index_level_0 column when no schema was specified (see this issue).
Using pyarrow.Table.from_pandas(df) as node return fixed both problems. Could this function be embedded into pandas.DeltaTableDataset in the next release of kedro datasets?
Possible Implementation
Embed pyarrow.Table.from_pandas() inside pandas.DeltaTableDataset.save() function.
Possible Alternatives
Use the pyarrow.Table.from_pandas() function in every node return.
Thanks for opening @julio-cmdr . I see you mention the iris dataset, can this be reproduced with something like https://github.com/kedro-org/kedro-starters/tree/main/pandas-iris then?
Yeas, I think so! In the example above I just did pd.read_csv() to get the iris dataframe.
regard to index_level_0, I have seen a case that this get created on transcoding from pandas -> spark with parquet. By default pandas.CSVDataset use to_index=False, but this is not consistent for other pandas dataset (ParquetDataset etc)