blazingsql
blazingsql copied to clipboard
[BUG] cannot create blazingcontext for async dask_cudf clients
Describe the bug
Passing in an asynchronous=True dask_cudf client to BlazingContext() throws an exception
This is unfortunate as:
-
async feat is good for sw (apps, dashboards, ..): we're using dask_cudf + async clients to make rapids stack less of a bottleneck, so bsql calls break this benefit
-
memory waste: for single-node (incl. multi-gpu), this means having to create 2 localcudaclusters: async clients need async clusters, and vice versa for sync (afaict!)
Steps/Code to reproduce bug
import pandas as pd, pytest
pytestmark = pytest.mark.asyncio
@pytest.mark.timeout(30)
def test_blazing_sync_client():
import cudf
from blazingsql import BlazingContext
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
with LocalCUDACluster(
asynchronous = False,
dashboard_address = None, #Otherwise > raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: doc_js.js
processes = True, #multigpu
) as cluster:
with Client(address=cluster, asynchronous=False) as client:
bc = BlazingContext(client)
pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [1, 2, 2, 3]})
gdf = cudf.DataFrame.from_pandas(pdf)
bc.create_table('bg_test_dot_product', gdf)
dgdf2 = bc.sql('SELECT SUM(bg_test_dot_product.a) AS my_sum FROM bg_test_dot_product')
gdf2 = dgdf2.compute()
assert 6 == gdf2['my_sum'][0]
@pytest.mark.timeout(30)
async def test_blazing_async_client():
import cudf
from blazingsql import BlazingContext
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
async with await LocalCUDACluster(
asynchronous = True,
dashboard_address = None, #Otherwise > raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: doc_js.js
processes = True, #multigpu
) as cluster:
async with await Client(address=cluster, asynchronous=True) as client:
bc = BlazingContext(client)
pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [1, 2, 2, 3]})
gdf = cudf.DataFrame.from_pandas(pdf)
bc.create_table('bg_test_dot_product', gdf)
dgdf2 = bc.sql('SELECT SUM(bg_test_dot_product.a) AS my_sum FROM bg_test_dot_product')
gdf2 = dgdf2.compute()
assert 6 == gdf2['my_sum'][0]
The second test fails with:
test/server/test_blazing.py:66:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/conda/envs/rapids/lib/python3.7/site-packages/pyblazing/apiv2/context.py:1418: in __init__
distributed_initialize_server_directory(self.dask_client, logging_dir_path)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: not connected>, dir_path = 'blazing_log'
def distributed_initialize_server_directory(client, dir_path):
# We are going to differentiate the two cases. When path is absolute,
# we do the logging folder creation only once per host (server).
# When path is relative, we have to group the workers according
# to whether they have the same current working directory,
# so, a unique folder will be created for each sub common cwd set.
all_items = client.scheduler_info()["workers"].items()
is_absolute_path = os.path.isabs(dir_path)
import re
if is_absolute_path:
# Let's group the workers by host_name
host_worker_dict = {}
for worker, worker_info in all_items:
host_name = re.findall(r"[0-9]+(?:\.[0-9]+){3}", worker)[0]
if host_name not in host_worker_dict.keys():
host_worker_dict[host_name] = [worker]
else:
host_worker_dict[host_name].append(worker)
dask_futures = []
for host_name, worker_list in host_worker_dict.items():
dask_futures.append(
client.submit(
initialize_server_directory,
dir_path,
True,
workers=[worker_list[0]],
pure=False,
)
)
for connection in dask_futures:
made_dir = connection.result()
if not made_dir:
get_blazing_logger(is_dask=False).info("Directory already exists")
else:
# Let's get the current working directory of all workers
dask_futures = []
for worker, worker_info in all_items:
dask_futures.append(
client.submit(get_current_directory_path, workers=[worker], pure=False)
)
current_working_dirs = client.gather(dask_futures)
# Let's group the workers by host_name and by common cwd
host_worker_dict = {}
> for worker_key, cwd in zip(all_items, current_working_dirs):
E TypeError: zip argument #2 must support iteration
/conda/envs/rapids/lib/python3.7/site-packages/pyblazing/apiv2/context.py:726: TypeError
Expected behavior
Both tests pass
Environment overview (please complete the following information)
docker w/ 10.2 -> conda -> rapids 0.17
----For BlazingSQL Developers---- Suspected source of the issue Where and what are potential sources of the issue
Other design considerations What components of the engine could be affected by this?
I'm wondering if there's a ~cheap workaround here, like:
async def proxiedLocalCUDAClusterClient() -> client:- returns an async dask client to an async dask cluster w/ 1 worker, w/ in Process mode
- side effect of creating a sync cuda cluster + sync cuda client on that worker
- proxy stubs for conveniently calling
BlazingContext(),create_table,sql, (anything else?) on the workerdgdf = await client.run( bcProxy.sql(...) )
Or if it's not hard to get the context creator to use an async client. (Though maybe that's a deep assumption in bsql python, so should use the stub approach?)
Thinking a bit more: The proxy isn't that great because it still has the issue of 2 localcudaclusters
For our use case, we're moving to starting an a separate cuda cluster process (same-node) so mult processes can hit it. I think the proxy solution will work then: async dask client -> sync in-worker dask client -> 'remote' cluster.
So we'll do sync for now, and once our cluster service is up, revisit some sort of async proxy. Our bsql calls are fairly thin async def run_query(query : str, table: Dict[str, Union[cudf.DataFrame, dask_cudf.DataFrame]]) : Union[cudf.DataFrame, dask_cudf.DataFrmae] -- so am optimistic.
@felipeblazing The return_futures=True was just what I needed. AFAICT this makes the blazingsql calls non-blocking -- functional tests seem to pass, but not benchmarked:
async def collect_bsql_futures(sync_client: Client, bc: BlazingContext, dask_futures : list, as_gdf = False) -> Union[cudf.DataFrame, dask_cudf.DataFrame]:
"""
:param sync_client: dask.distributed.Client with asynchronous=False
:param bc: BlazingContext with dask_cudf cluster
:param dask_futures: Result of a bc.sql(..., return_futures=True) call
:param as_gudf: Whether to return dask_cudf.DataFrame or cudf.DataFrame
Async call of bsql, returning as original dgdf, or optionally, post-processed to gdf
May also need 'dask.config.set(scheduler=gpu_client_sync)' calls
Examples:
dgdf = await collect_bsql_futures(sybc_client, bc.sql(qry, return_futures=True))
gdf = await collect_bsql_futures(sync_client, bc.sql(qry, return_futures=True), as_gdf = True)
"""
from pyblazing.apiv2.context import distributed_remove_orc_files_from_disk
logger.info('@collect_bsql_futures')
#dask.config.set(scheduler=client)
try:
meta_results : list = await sync_client.gather(dask_futures, asynchronous=True)
#meta_results : list = sync_client.gather(dask_futures)
logger.debug('meta :: %s = %s', type(meta_results), meta_results)
except Exception as e:
logger.error('exn running query, cleaning up old orc file cache', exc_info=True)
try:
#FIXME somehow get txn id and plug in here?
distributed_remove_orc_files_from_disk(sync_client, bc.cache_dir_path)
logger.debug('... Cleaned up')
except Exception as e2:
logger.error('Failed cleanup of failed bsql query', exc_info=True)
raise e
futures : list = []
for query_partids, meta, worker_id in meta_results:
logger.info('meta results meta list item: %s', meta)
for query_partid in query_partids:
futures.append(sync_client.submit(
pop_worker_query_part,
query_partid,
workers=[worker_id],
pure=False))
logger.debug('collected futures: %s', futures)
result : dask_cudf.DataFrame = dask.dataframe.from_delayed(futures, meta=meta)
logger.info('from_delayed result :: %s = %s', type(result), result)
if as_gdf:
#gdf2 = result.compute()
[gdf2] = await sync_client.gather([result.compute(compute=False)], asynchronous=True )
logger.debug('gdf2::%s', type(gdf2))
logger.info('////collect_bsql_futures gdf')
return gdf2
else:
logger.info('////collect_bsql_futures dgdf')
return result
Passing tests:
@pytest.mark.timeout(30)
def test_bsql_sync_warmup(bc, gpu_client_sync):
assert True
@pytest.mark.timeout(30)
def test_bsql_async_warmup(gpu_client):
assert True
@pytest.mark.timeout(30)
def test_bsql_default(bc, gpu_client_sync):
dask.config.set(scheduler=gpu_client_sync)
dgdf = bc.sql('SELECT SUM(bc_sample.a) AS my_sum FROM bc_sample')
gdf = dgdf.compute()
assert gdf['my_sum'][0] == 6
@pytest.mark.timeout(30)
async def test_collect_bsql_futures_dgdf(bc, gpu_client_sync, gpu_client):
logger.debug('@test_collect_bsql_futures_dgdf')
dask.config.set(scheduler=gpu_client_sync)
dask_futures = bc.sql('SELECT SUM(bc_sample.a) AS my_sum FROM bc_sample', return_futures=True)
logger.info('got futures: %s', dask_futures)
dgdf2 = await collect_bsql_futures(gpu_client_sync, bc, dask_futures)
logger.info('collected dgdf2: %s', dgdf2)
gdf2 = dgdf2.compute()
assert gdf2['my_sum'][0] == 6
@pytest.mark.timeout(30)
async def test_collect_bsql_futures_gdf(bc, gpu_client_sync, gpu_cluster):
from server.util.dask_cudf import dask_cudf_init_helper_sync, make_cluster, UserClient
async with await UserClient(f'async_{__name__}', allow_reset=False, address=gpu_cluster) as gpu_client:
dask.config.set(scheduler=gpu_client_sync)
dask_futures = bc.sql('SELECT SUM(bc_sample.a) AS my_sum FROM bc_sample', return_futures=True)
gdf2 = await collect_bsql_futures(gpu_client_sync, bc, dask_futures, as_gdf=True)
assert gdf2['my_sum'][0] == 6
@lmeyerov I dont think we will be able to support asynchronous=True dask_cudf clients for a while. Its very non-trivial.
Additionally, a recent PR https://github.com/BlazingDB/blazingsql/pull/1289 has gotten rid of return_futures. We will be instead implementing very soon a more native way of being able to run concurrent queries using the same BlazingContext
https://github.com/BlazingDB/blazingsql/issues/1290
Feel free to comment on the proposed APIs. We will be tackling this soon
Thanks! For our use case, where we're currently at:
- [x] we're using a sync bsql client to a remote dask cluster
- ... so async dask_cudf + sync bsql clients can now share GPU mem! just gotta
publishsimilar to above
- ... so async dask_cudf + sync bsql clients can now share GPU mem! just gotta
- [x] wrapping concurrent bsql calls in
dask.distributed.Lock<- solved in RAPIDS 0.18 - [ ] bsql calls still blocking :( not a deal breaker b/c we have 4+ python processes per GPU. I would think that it wouldn't be so bad to solve for the remote dask worker case, but I didn't understand the sql entrypoint code enough to make it obvious to me.
- interestingly, the
dask.distributed.Lockat least gives async for the queued-but-not-actively-processing bsql clients, b/c locks are async!
- interestingly, the
- [x] bsql tasks bigger-than-memory datasets
- [ ] bsql cannot bigger-than-memory dask cudf dataframes <- imo, more important than fixing blocking issue b/c we can at least work around sync via threads/processes, but not bigger-than-memory
FWIW, I'm about to get back to large dataset testing w/ bsql b/c of our big file loader feature + customer proveouts. I'm guessing more imp will be stuff like making sure groupbys work, and ideally, we can get big datasets back as part of a pipelie :)