dask-sql
dask-sql copied to clipboard
[DF] [ENH] Add series.isin support for predicate pushdown
Is your feature request related to a problem? Please describe.
With datafusion we now map column in (val1,val2,val3) like operations to series.isin. This operator does not support predicate pushdown
Describe the solution you'd like
Add support for predicate pushdown for series.isIn operation.
Describe alternatives you've considered N/A
Additional context N/A
Minor update: I started exploring this feature and found that we probably need to make a small change in dask.dataframe to make it easier to "regenerate" an isin layer. Right now, the asin implementation is wrapping the iterable values argument in delayed. As far as I can tell, we should be able to avoid this behavior for simple list/tuple arguments.
With the upstream isin method modified, I was able to add M.isin/dd._Frame.isin to the logic in dask-sql/dask_sql/physical/utils/filter.py to produce the following behavior:
import dask.dataframe as dd
from dask_sql.physical.utils.filter import attempt_predicate_pushdown
from dask.utils_test import hlg_layer
ddf = dd.read_parquet("testfile.parquet")
ddf = ddf[ddf["A"].isin([0,2,4,6,8])]
ddf2 = attempt_predicate_pushdown(ddf)
# Check filters
print(hlg_layer(ddf2.dask, "read-parquet").creation_info['kwargs']['filters'])
# Check compute
print("\n",ddf2.compute())
[[('A', 'in', (0, 2, 4, 6, 8))]]
A
0 0
1 2
2 4
3 6
4 8
Thanks for looking into this @rjzamora. We can update the implementation here once the dask dataframe pr makes it in.
As far as I can tell, we should be able to avoid this behavior for simple list/tuple arguments.
It looks like I was wrong about this (see: https://github.com/dask/dask/pull/4727). Rather than removing the delayed usage in Dask, I may need to add a simple DelayedObject layer to dask (so we get something other than a MaterializedLayer)