dask-sql icon indicating copy to clipboard operation
dask-sql copied to clipboard

[BUG] Joins on GPU tables with multiple partitions fail

Open ChrisJar opened this issue 3 years ago • 4 comments

What happened: Joining tables backed by dask_cudf dataframes with multiple partitions causes the error AttributeError: 'Int64Index' object has no attribute '_get_attributes_dict' to be thrown

Minimal Complete Verifiable Example:

import numpy as np
import cudf
import dask_cudf
from dask_sql import Context

df_a = cudf.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_b = cudf.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})

ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)

c = Context()

c.create_table("dfa", ddf_a)
c.create_table("dfb", ddf_b)

print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())

throws

Traceback

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Input In [2], in ()
     14 c.create_table("dfa", ddf_a)
     15 c.create_table("dfb", ddf_b)
---> 17 print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/base.py:314, in DaskMethodsMixin.compute(self, **kwargs) 290 def compute(self, **kwargs): 291 """Compute this dask collection 292 293 This turns a lazy Dask collection into its in-memory equivalent. (...) 312 dask.base.compute 313 """ --> 314 (result,) = compute(self, traverse=False, **kwargs) 315 return result

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/base.py:602, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs) 599 keys.append(x.dask_keys()) 600 postcomputes.append(x.dask_postcompute()) --> 602 results = schedule(dsk, keys, **kwargs) 603 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:557, in get_sync(dsk, keys, **kwargs) 552 """A naive synchronous version of get_async 553 554 Can be useful for debugging. 555 """ 556 kwargs.pop("num_workers", None) # if num_workers present, remove it --> 557 return get_async( 558 synchronous_executor.submit, 559 synchronous_executor._max_workers, 560 dsk, 561 keys, 562 **kwargs, 563 )

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:500, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs) 498 while state["waiting"] or state["ready"] or state["running"]: 499 fire_tasks(chunksize) --> 500 for key, res_info, failed in queue_get(queue).result(): 501 if failed: 502 exc, tb = loads(res_info)

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout) 435 raise CancelledError() 436 elif self._state == FINISHED: --> 437 return self.__get_result() 439 self._condition.wait(timeout) 441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self) 387 if self._exception: 388 try: --> 389 raise self._exception 390 finally: 391 # Break a reference cycle with the exception in self._exception 392 self = None

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:542, in SynchronousExecutor.submit(self, fn, *args, **kwargs) 540 fut = Future() 541 try: --> 542 fut.set_result(fn(*args, **kwargs)) 543 except BaseException as e: 544 fut.set_exception(e)

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:238, in batch_execute_tasks(it) 234 def batch_execute_tasks(it): 235 """ 236 Batch computing of multiple tasks with execute_task 237 """ --> 238 return [execute_task(*a) for a in it]

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:238, in (.0) 234 def batch_execute_tasks(it): 235 """ 236 Batch computing of multiple tasks with execute_task 237 """ --> 238 return [execute_task(*a) for a in it]

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:229, in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 227 failed = False 228 except BaseException as e: --> 229 result = pack_exception(e, dumps) 230 failed = True 231 return key, result, failed

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 222 try: 223 task, data = loads(task_info) --> 224 result = _execute_task(task, data) 225 id = get_id() 226 result = dumps((result, id))

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk) 115 func, args = arg[0], arg[1:] 116 # Note: Don't assign the subtask results to a variable. numpy detects 117 # temporaries by their reference count and can execute certain 118 # operations in-place. --> 119 return func(*(_execute_task(a, cache) for a in args)) 120 elif not ishashable(arg): 121 return arg

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/shuffle.py:968, in shuffle_group_3(df, col, npartitions, p) 966 g = df.groupby(col) 967 d = {i: g.get_group(i) for i in g.groups} --> 968 p.append(d, fsync=True)

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/partd/encode.py:23, in Encode.append(self, data, **kwargs) 22 def append(self, data, **kwargs): ---> 23 data = valmap(self.encode, data) 24 data = valmap(frame, data) 25 self.partd.append(data, **kwargs)

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/toolz/dicttoolz.py:83, in valmap(func, d, factory) 72 """ Apply function to values of dictionary 73 74 >>> bills = {"Alice": [20, 15, 30], "Bob": [10, 35]} (...) 80 itemmap 81 """ 82 rv = factory() ---> 83 rv.update(zip(d.keys(), map(func, d.values()))) 84 return rv

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/partd/pandas.py:181, in serialize(df) 176 """ Serialize and compress a Pandas DataFrame 177 178 Uses Pandas blocks, snappy, and blosc to deconstruct an array into bytes 179 """ 180 col_header, col_bytes = index_to_header_bytes(df.columns) --> 181 ind_header, ind_bytes = index_to_header_bytes(df.index) 182 headers = [col_header, ind_header] 183 bytes = [col_bytes, ind_bytes]

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/partd/pandas.py:113, in index_to_header_bytes(ind) 110 cat = None 111 values = ind.values --> 113 header = (type(ind), ind._get_attributes_dict(), values.dtype, cat) 114 bytes = pnp.compress(pnp.serialize(values), values.dtype) 115 return header, bytes

AttributeError: 'Int64Index' object has no attribute '_get_attributes_dict'

However, when the same query is performed with CPU dataframes:

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask_sql import Context

df_a = pd.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_b = pd.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})

ddf_a = dd.from_pandas(df_a, npartitions=2)
ddf_b = dd.from_pandas(df_b, npartitions=2)

c = Context()

c.create_table("dfa", ddf_a)
c.create_table("dfb", ddf_b)

print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())

Or GPU dataframes with a single partition:

import numpy as np
import cudf
import dask_cudf
from dask_sql import Context

df_a = cudf.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_b = cudf.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})

ddf_a = dask_cudf.from_cudf(df_a, npartitions=1)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=1)

c = Context()

c.create_table("dfa", ddf_a)
c.create_table("dfb", ddf_b)

print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())

the result is:

   a  b  a0  c
0  1  0   1  1
0  4  0   4  1
1  5  0   5  1
2  2  0   2  1
3  3  0   3  1

Environment:

  • dask-sql version: 2022.6.0
  • dask/distributed version: 22.6.1
  • dask_cudf version: 22.08.00a220706
  • Python version: 3.8.13
  • Operating System: Ubuntu 18.04.4
  • Install method: Source

ChrisJar avatar Jul 06 '22 19:07 ChrisJar

This comes from partd. Solving this would also help enable groupby.apply with cudf backed dask dataframes: https://github.com/rapidsai/cudf/issues/5755#issuecomment-976823896

beckernick avatar Jul 13 '22 18:07 beckernick

This comes from partd. Solving this would also help enable groupby.apply with cudf backed dask dataframes: rapidsai/cudf#5755 (comment)

Partd no longer depends on the deprecated Pandas index class, but the reproducer still throws the same exception

randerzander avatar Oct 11 '22 17:10 randerzander

Nice! Are you saying the traceback you're now seeing still references the removed internal pandas functionality? Do we possibly need to update a dependency pinning somewhere?

beckernick avatar Oct 11 '22 18:10 beckernick

Oops, I missed a subtle difference in the trace:

File /opt/conda/envs/rapids/lib/python3.9/site-packages/partd/pandas.py:111, in index_to_header_bytes(ind)
    108     cat = None
    109     values = ind.values
--> 111 header = (type(ind), {k: getattr(ind, k, None) for k in ind._attributes}, values.dtype, cat)
    112 bytes = pnp.compress(pnp.serialize(values), values.dtype)
    113 return header, bytes

AttributeError: 'Int64Index' object has no attribute '_attributes'

It looks like _attributes is also gone from Pandas indexes?

Do we possibly need to update a pinning somewhere

I don't think so- the latest version in pypi is 1.3.0 which is what I have in my env.

randerzander avatar Oct 12 '22 00:10 randerzander