arrow icon indicating copy to clipboard operation
arrow copied to clipboard

[Python] Dataset join_asof not working on dictionary encoded column

Open mtofano opened this issue 1 year ago • 0 comments

Hi there,

I have a large on-disk dataset that I am creating as follows (where pa_path is a str and pa_fs is of type S3FileSystem):

dataset = ds.dataset(
    source=pa_path,
    filesystem=pa_fs,
    format="parquet",
    partitioning=ds.partitioning(
        schema=pa.schema(
            fields=[
                ("underlier_id", pa.uint64()),
                ("trade_date", pa.date32())
            ]
        ),
        flavor="filename"
    )
)

The schema of my dataset is as follows (omitting the full struct schema for data as it is rather large):

underlier_id: uint64
trade_date: date32[day]
as_of_time: timestamp[ns]
currency: dictionary<values=string, indices=int32, ordered=0>
data: struct

The as_of_time column contains minutely binned data, but there are cases where a user query will not exactly match up with the minutely bins we have available. In this case, I want to perform an as of join so that we can pull in the most recent data.

I am using polars to create an InMemoryDataset from a Table that I have populated with the desired underlier_id and as_of_time columns:

as_of_df = pl.DataFrame(
    data={
        "underlier_id": 5135108,
        "trade_date": dt.date(2024, 1, 5),
        "as_of_time": dt.datetime(2024, 1, 5, 16, 31),
    },
    schema={
        "underlier_id": pl.UInt64,
        "trade_date": pl.Date,
        "as_of_time": pl.Datetime("ns")
    }
)

as_of_table = as_of_df.to_arrow()

as_of_ds = ds.InMemoryDataset(source=as_of_table)

And finally performing the join_asof as follows:

result = as_of_ds.join_asof(
    filtered_ds,
    on="as_of_time",
    by=["underlier_id", "trade_date"],
    tolerance=0
)

print(result.to_table())

However, it seems that I am getting the following error:

{
	"name": "ArrowInvalid",
	"message": "Unsupported type for data field currency : dictionary<values=string, indices=int32, ordered=0>",
	"stack": "---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
Cell In[24], line 1
----> 1 result = as_of_ds.join_asof(
      2     filtered_ds,
      3     on=\"as_of_time\",
      4     by=[\"underlier_id\", \"trade_date\"],
      5     tolerance=0
      6 )
      8 print(result.to_table())

File ~/.conda/envs/pog-vol/lib/python3.11/site-packages/pyarrow/_dataset.pyx:943, in pyarrow._dataset.Dataset.join_asof()

File ~/.conda/envs/pog-vol/lib/python3.11/site-packages/pyarrow/acero.py:333, in _perform_join_asof(left_operand, left_on, left_by, right_operand, right_on, right_by, tolerance, use_threads, output_type)
    326 join_opts = AsofJoinNodeOptions(
    327     left_on, left_by, right_on, right_by, tolerance
    328 )
    329 decl = Declaration(
    330     \"asofjoin\", options=join_opts, inputs=[left_source, right_source]
    331 )
--> 333 result_table = decl.to_table(use_threads=use_threads)
    335 if output_type == Table:
    336     return result_table

File ~/.conda/envs/pog-vol/lib/python3.11/site-packages/pyarrow/_acero.pyx:590, in pyarrow._acero.Declaration.to_table()

File ~/.conda/envs/pog-vol/lib/python3.11/site-packages/pyarrow/error.pxi:155, in pyarrow.lib.pyarrow_internal_check_status()

File ~/.conda/envs/pog-vol/lib/python3.11/site-packages/pyarrow/error.pxi:92, in pyarrow.lib.check_status()

ArrowInvalid: Unsupported type for data field currency : dictionary<values=string, indices=int32, ordered=0>"
}

I am seeking some help to answer the following two questions:

  1. Am I using the join_asof feature correctly?
  2. If yes, why is this not working on the dictionary encoded column?

Thank you in advance for any and all help! Please let me know if more information is required.

Component(s)

Python

mtofano avatar Oct 21 '24 15:10 mtofano