dask-sql
dask-sql copied to clipboard
[BUG] Joins on GPU tables with multiple partitions fail
What happened:
Joining tables backed by dask_cudf dataframes with multiple partitions causes the error AttributeError: 'Int64Index' object has no attribute '_get_attributes_dict' to be thrown
Minimal Complete Verifiable Example:
import numpy as np
import cudf
import dask_cudf
from dask_sql import Context
df_a = cudf.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_b = cudf.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)
c = Context()
c.create_table("dfa", ddf_a)
c.create_table("dfb", ddf_b)
print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())
throws
Traceback
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Input In [2], in | ()
14 c.create_table("dfa", ddf_a)
15 c.create_table("dfb", ddf_b)
---> 17 print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
290 def compute(self, **kwargs):
291 """Compute this dask collection
292
293 This turns a lazy Dask collection into its in-memory equivalent.
(...)
312 dask.base.compute
313 """
--> 314 (result,) = compute(self, traverse=False, **kwargs)
315 return result
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/base.py:602, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
599 keys.append(x.dask_keys())
600 postcomputes.append(x.dask_postcompute())
--> 602 results = schedule(dsk, keys, **kwargs)
603 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:557, in get_sync(dsk, keys, **kwargs)
552 """A naive synchronous version of get_async
553
554 Can be useful for debugging.
555 """
556 kwargs.pop("num_workers", None) # if num_workers present, remove it
--> 557 return get_async(
558 synchronous_executor.submit,
559 synchronous_executor._max_workers,
560 dsk,
561 keys,
562 **kwargs,
563 )
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:500, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
498 while state["waiting"] or state["ready"] or state["running"]:
499 fire_tasks(chunksize)
--> 500 for key, res_info, failed in queue_get(queue).result():
501 if failed:
502 exc, tb = loads(res_info)
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
435 raise CancelledError()
436 elif self._state == FINISHED:
--> 437 return self.__get_result()
439 self._condition.wait(timeout)
441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
387 if self._exception:
388 try:
--> 389 raise self._exception
390 finally:
391 # Break a reference cycle with the exception in self._exception
392 self = None
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:542, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
540 fut = Future()
541 try:
--> 542 fut.set_result(fn(*args, **kwargs))
543 except BaseException as e:
544 fut.set_exception(e)
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:238, in batch_execute_tasks(it)
234 def batch_execute_tasks(it):
235 """
236 Batch computing of multiple tasks with execute_task
237 """
--> 238 return [execute_task(*a) for a in it]
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:238, in (.0)
234 def batch_execute_tasks(it):
235 """
236 Batch computing of multiple tasks with execute_task
237 """
--> 238 return [execute_task(*a) for a in it]
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:229, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
227 failed = False
228 except BaseException as e:
--> 229 result = pack_exception(e, dumps)
230 failed = True
231 return key, result, failed
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
222 try:
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
226 result = dumps((result, id))
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/shuffle.py:968, in shuffle_group_3(df, col, npartitions, p)
966 g = df.groupby(col)
967 d = {i: g.get_group(i) for i in g.groups}
--> 968 p.append(d, fsync=True)
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/partd/encode.py:23, in Encode.append(self, data, **kwargs)
22 def append(self, data, **kwargs):
---> 23 data = valmap(self.encode, data)
24 data = valmap(frame, data)
25 self.partd.append(data, **kwargs)
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/toolz/dicttoolz.py:83, in valmap(func, d, factory)
72 """ Apply function to values of dictionary
73
74 >>> bills = {"Alice": [20, 15, 30], "Bob": [10, 35]}
(...)
80 itemmap
81 """
82 rv = factory()
---> 83 rv.update(zip(d.keys(), map(func, d.values())))
84 return rv
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/partd/pandas.py:181, in serialize(df)
176 """ Serialize and compress a Pandas DataFrame
177
178 Uses Pandas blocks, snappy, and blosc to deconstruct an array into bytes
179 """
180 col_header, col_bytes = index_to_header_bytes(df.columns)
--> 181 ind_header, ind_bytes = index_to_header_bytes(df.index)
182 headers = [col_header, ind_header]
183 bytes = [col_bytes, ind_bytes]
File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/partd/pandas.py:113, in index_to_header_bytes(ind)
110 cat = None
111 values = ind.values
--> 113 header = (type(ind), ind._get_attributes_dict(), values.dtype, cat)
114 bytes = pnp.compress(pnp.serialize(values), values.dtype)
115 return header, bytes
AttributeError: 'Int64Index' object has no attribute '_get_attributes_dict'
|
However, when the same query is performed with CPU dataframes:
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask_sql import Context
df_a = pd.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_b = pd.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})
ddf_a = dd.from_pandas(df_a, npartitions=2)
ddf_b = dd.from_pandas(df_b, npartitions=2)
c = Context()
c.create_table("dfa", ddf_a)
c.create_table("dfb", ddf_b)
print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())
Or GPU dataframes with a single partition:
import numpy as np
import cudf
import dask_cudf
from dask_sql import Context
df_a = cudf.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_b = cudf.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})
ddf_a = dask_cudf.from_cudf(df_a, npartitions=1)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=1)
c = Context()
c.create_table("dfa", ddf_a)
c.create_table("dfb", ddf_b)
print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())
the result is:
a b a0 c
0 1 0 1 1
0 4 0 4 1
1 5 0 5 1
2 2 0 2 1
3 3 0 3 1
Environment:
- dask-sql version: 2022.6.0
- dask/distributed version: 22.6.1
- dask_cudf version: 22.08.00a220706
- Python version: 3.8.13
- Operating System: Ubuntu 18.04.4
- Install method: Source
This comes from partd. Solving this would also help enable groupby.apply with cudf backed dask dataframes: https://github.com/rapidsai/cudf/issues/5755#issuecomment-976823896
This comes from partd. Solving this would also help enable
groupby.applywith cudf backed dask dataframes: rapidsai/cudf#5755 (comment)
Partd no longer depends on the deprecated Pandas index class, but the reproducer still throws the same exception
Nice! Are you saying the traceback you're now seeing still references the removed internal pandas functionality? Do we possibly need to update a dependency pinning somewhere?
Oops, I missed a subtle difference in the trace:
File /opt/conda/envs/rapids/lib/python3.9/site-packages/partd/pandas.py:111, in index_to_header_bytes(ind)
108 cat = None
109 values = ind.values
--> 111 header = (type(ind), {k: getattr(ind, k, None) for k in ind._attributes}, values.dtype, cat)
112 bytes = pnp.compress(pnp.serialize(values), values.dtype)
113 return header, bytes
AttributeError: 'Int64Index' object has no attribute '_attributes'
It looks like _attributes is also gone from Pandas indexes?
Do we possibly need to update a pinning somewhere
I don't think so- the latest version in pypi is 1.3.0 which is what I have in my env.