kedro-plugins
kedro-plugins copied to clipboard
Multi Level partioned Dataset with filters on rows and columns using Pyarrow
Description
Multi Level partioned Dataset with filters on rows and columns, to easily extract only the required data from a datastore
Context
Easily add Multi Level partioned Dataset
Possible Implementation
from copy import deepcopy
from typing import Any, Dict
from pathlib import PurePosixPath
from kedro.io import AbstractDataSet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
class PyarrowParquetDataSet(AbstractDataSet):
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]
def __init__(self, folderpath: str, load_args: Dict[str, Any] = None, save_args: Dict[str, Any] = None):
self._filepath = PurePosixPath(folderpath)
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)
def _load(self):
dataset = pq.ParquetDataset(self._filepath, use_legacy_dataset=False, **self._load_args)
return dataset
def _save(self, df: pd.DataFrame) -> None:
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path=self._filepath, use_legacy_dataset=False, **self._save_args)
def _describe(self) -> Dict[str, Any]:
"""Returns a dict that describes the attributes of the dataset"""
return dict(filepath=self._filepath, load_args=self._load_args, save_args=self._save_args)
Catalog.yml:
raw_store_2m:
type: auto_trade.extras.datasets.pyarrow_parquet_dataset.PyarrowParquetDataSet
folderpath: data/01_raw/ohlcv/
load_args:
filters:
save_args:
partition_cols: ['symbol','year','month']
existing_data_behavior: overwrite_or_ignore
Hello @Harsh-Maheshwari, please could you explain a bit more here exactly what you're trying to do? Are you using dask or pandas ParquetDataSet? What functionality exactly are you trying to achieve? Have you considered using PartitionedDataSet?
Hello @AntonyMilneQB
Please ignore the Possible Implementation that I had submitted before, Since then the code has changed
I have created a new dataSet that saves data in Parquet format.
- While saving It takes in
partition_colsas asave_argsparameter in catalog.yml
- The
partition_colsparameter helps in partitioning the data on multiple columns (As far as I know the current implementation forPartitionedDataSetallows only partitioning on a single column --limitation) - It can directly take in a pandas DataFrame as output and save it as a Parquet file
- While loading it loads an
Arrow DatasetsReference
- This allows us to lazily load the data when we need it (Already a feature in
PartitionedDataSet) - Allows us to add a filter on rows and columns while inside a node (These Filters can be passed from parameters.yml) (New Feature)
PyarrowParquetDataSet (New DataSet)
from copy import deepcopy
from typing import Any, Dict
from pathlib import PurePosixPath
from kedro.io import AbstractDataSet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
class PyarrowParquetDataSet(AbstractDataSet):
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]
def __init__(self, folderpath: str, load_args: Dict[str, Any] = None, save_args: Dict[str, Any] = None):
self._filepath = PurePosixPath(folderpath)
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)
def _load(self):
# dataset = pq.ParquetDataset(self._filepath, use_legacy_dataset=False, **self._load_args)
dataset = ds.dataset(self._filepath, format="parquet", **self._load_args)
return dataset
def _save(self, df: pd.DataFrame) -> None:
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path=self._filepath, use_legacy_dataset=False, **self._save_args)
def _describe(self) -> Dict[str, Any]:
"""Returns a dict that describes the attributes of the dataset"""
return dict(filepath=self._filepath, load_args=self._load_args, save_args=self._save_args)
Catalog.yml
df:
type: df.extras.datasets.pyarrow_parquet_dataset.PyarrowParquetDataSet
folderpath: data/01_raw/A/
load_args:
partitioning: hive
save_args:
partition_cols: ['symbol','year','month']
existing_data_behavior: overwrite_or_ignore
Hi @Harsh-Maheshwari thanks for your input here - we'd love to accept a PR for this, but we have some thoughts about the implementation:
- The class should be called
pyarrow.ParquetDataSet - The implementation should NOT be tightly coupled with Pandas
This could work neatly by adapting your implementation above to look a little like this:
class ParquetDataSet(AbstractDataSet):
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]
def __init__(
self,
folderpath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
from_method:str='pandas'
):
"""
from: Can be one of `arrays`, `batches`, `pandas` or `pydict`
"""
self._filepath = PurePosixPath(folderpath)
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)
def _load(self) -> pa.ParquetDataSet:
dataset = ds.dataset(self._filepath, format="parquet", **self._load_args)
return dataset
def _save(self, df: Any) -> None:
writer = getattr(pa.Table, f'from_{self.from_method}'
table = writer(df)
pq.write_to_dataset(table, root_path=self._filepath, use_legacy_dataset=False, **self._save_args)
def _describe(self) -> Dict[str, Any]:
"""Returns a dict that describes the attributes of the dataset"""
return dict(filepath=self._filepath, load_args=self._load_args, save_args=self._save_args)
Hi Team, Do we have any pull request on the above feature. If not I can create one
@Harsh-Maheshwari , there's no open PR for this yet, so feel free to take this up! 😄