kedro-plugins icon indicating copy to clipboard operation
kedro-plugins copied to clipboard

Multi Level partioned Dataset with filters on rows and columns using Pyarrow

Open Harsh-Maheshwari opened this issue 3 years ago • 5 comments

Description

Multi Level partioned Dataset with filters on rows and columns, to easily extract only the required data from a datastore

Context

Easily add Multi Level partioned Dataset

Possible Implementation

from copy import deepcopy
from typing import Any, Dict
from pathlib import PurePosixPath
from kedro.io import AbstractDataSet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


class PyarrowParquetDataSet(AbstractDataSet):
    DEFAULT_LOAD_ARGS = {}  # type: Dict[str, Any]
    DEFAULT_SAVE_ARGS = {}  # type: Dict[str, Any]

    def __init__(self, folderpath: str, load_args: Dict[str, Any] = None, save_args: Dict[str, Any] = None):
        self._filepath = PurePosixPath(folderpath)

        self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
        if load_args is not None:
            self._load_args.update(load_args)
        self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
        if save_args is not None:
            self._save_args.update(save_args)

    def _load(self):
        dataset = pq.ParquetDataset(self._filepath, use_legacy_dataset=False, **self._load_args)
        return dataset

    def _save(self, df: pd.DataFrame) -> None:
        table = pa.Table.from_pandas(df)
        pq.write_to_dataset(table, root_path=self._filepath, use_legacy_dataset=False, **self._save_args)

    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset"""
        return dict(filepath=self._filepath, load_args=self._load_args, save_args=self._save_args)

Catalog.yml:

raw_store_2m:
  type: auto_trade.extras.datasets.pyarrow_parquet_dataset.PyarrowParquetDataSet
  folderpath: data/01_raw/ohlcv/
  load_args:
    filters:
  save_args:
    partition_cols: ['symbol','year','month']
    existing_data_behavior: overwrite_or_ignore

Harsh-Maheshwari avatar Jun 18 '22 11:06 Harsh-Maheshwari

Hello @Harsh-Maheshwari, please could you explain a bit more here exactly what you're trying to do? Are you using dask or pandas ParquetDataSet? What functionality exactly are you trying to achieve? Have you considered using PartitionedDataSet?

antonymilne avatar Jun 27 '22 14:06 antonymilne

Hello @AntonyMilneQB

Please ignore the Possible Implementation that I had submitted before, Since then the code has changed

I have created a new dataSet that saves data in Parquet format.

  1. While saving It takes in partition_cols as a save_args parameter in catalog.yml
  • The partition_cols parameter helps in partitioning the data on multiple columns (As far as I know the current implementation for PartitionedDataSet allows only partitioning on a single column -- limitation)
  • It can directly take in a pandas DataFrame as output and save it as a Parquet file
  1. While loading it loads an Arrow Datasets Reference
  • This allows us to lazily load the data when we need it (Already a feature in PartitionedDataSet )
  • Allows us to add a filter on rows and columns while inside a node (These Filters can be passed from parameters.yml) (New Feature)

PyarrowParquetDataSet (New DataSet)

from copy import deepcopy
from typing import Any, Dict
from pathlib import PurePosixPath
from kedro.io import AbstractDataSet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

class PyarrowParquetDataSet(AbstractDataSet):
    DEFAULT_LOAD_ARGS = {}  # type: Dict[str, Any]
    DEFAULT_SAVE_ARGS = {}  # type: Dict[str, Any]

    def __init__(self, folderpath: str, load_args: Dict[str, Any] = None, save_args: Dict[str, Any] = None):
        self._filepath = PurePosixPath(folderpath)

        self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
        if load_args is not None:
            self._load_args.update(load_args)
        self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
        if save_args is not None:
            self._save_args.update(save_args)

    def _load(self):
        # dataset = pq.ParquetDataset(self._filepath, use_legacy_dataset=False, **self._load_args)
        dataset = ds.dataset(self._filepath, format="parquet", **self._load_args)
        return dataset

    def _save(self, df: pd.DataFrame) -> None:
        table = pa.Table.from_pandas(df)
        pq.write_to_dataset(table, root_path=self._filepath, use_legacy_dataset=False, **self._save_args)

    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset"""
        return dict(filepath=self._filepath, load_args=self._load_args, save_args=self._save_args)

Catalog.yml

df:
  type: df.extras.datasets.pyarrow_parquet_dataset.PyarrowParquetDataSet
  folderpath: data/01_raw/A/
  load_args:
    partitioning: hive
  save_args:
    partition_cols: ['symbol','year','month']
    existing_data_behavior: overwrite_or_ignore

Harsh-Maheshwari avatar Jun 29 '22 11:06 Harsh-Maheshwari

Hi @Harsh-Maheshwari thanks for your input here - we'd love to accept a PR for this, but we have some thoughts about the implementation:

  • The class should be called pyarrow.ParquetDataSet
  • The implementation should NOT be tightly coupled with Pandas

This could work neatly by adapting your implementation above to look a little like this:


class ParquetDataSet(AbstractDataSet):
    DEFAULT_LOAD_ARGS = {}  # type: Dict[str, Any]
    DEFAULT_SAVE_ARGS = {}  # type: Dict[str, Any]

    def __init__(
          self, 
          folderpath: str, 
          load_args: Dict[str, Any] = None, 
          save_args: Dict[str, Any] = None, 
          from_method:str='pandas'
       ):
       """
            from: Can be one of `arrays`, `batches`, `pandas` or `pydict`
       """
        self._filepath = PurePosixPath(folderpath)

        self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
        if load_args is not None:
            self._load_args.update(load_args)
        self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
        if save_args is not None:
            self._save_args.update(save_args)

    def _load(self) -> pa.ParquetDataSet:
        dataset = ds.dataset(self._filepath, format="parquet", **self._load_args)
        return dataset

    def _save(self, df: Any) -> None:
        writer = getattr(pa.Table, f'from_{self.from_method}' 
        table = writer(df)
        pq.write_to_dataset(table, root_path=self._filepath, use_legacy_dataset=False, **self._save_args)

    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset"""
        return dict(filepath=self._filepath, load_args=self._load_args, save_args=self._save_args)


datajoely avatar Jul 11 '22 15:07 datajoely

Hi Team, Do we have any pull request on the above feature. If not I can create one

Harsh-Maheshwari avatar Oct 08 '22 17:10 Harsh-Maheshwari

@Harsh-Maheshwari , there's no open PR for this yet, so feel free to take this up! 😄

merelcht avatar Oct 10 '22 13:10 merelcht