datashader icon indicating copy to clipboard operation
datashader copied to clipboard

Large amount of time spent on determining datashape

Open philippjfr opened this issue 5 years ago • 33 comments

When running aggregation on a large dataset a fairly large amount of time is spent in the dshape_from_dask function (somewhere in the region of 500-600ms). This is a major overhead when running the aggregation many times in a row and even dominates the actual aggregation time in many cases. Here is the output of a profiler running canvas.points on the census dataset:

         77200 function calls (75103 primitive calls) in 1.021 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    1.021    1.021 {built-in method builtins.exec}
        1    0.000    0.000    1.021    1.021 <string>:2(<module>)
        1    0.000    0.000    1.021    1.021 spaces.py:1093(__getitem__)
        1    0.000    0.000    1.020    1.020 spaces.py:890(_execute_callback)
        1    0.000    0.000    1.020    1.020 spaces.py:540(__call__)
        1    0.000    0.000    1.020    1.020 __init__.py:430(dynamic_operation)
        1    0.000    0.000    1.020    1.020 __init__.py:415(_process)
        1    0.000    0.000    1.020    1.020 operation.py:137(process_element)
      2/1    0.000    0.000    1.020    1.020 operation.py:113(_apply)
        1    0.000    0.000    1.019    1.019 datashader.py:782(_process)
     18/7    0.000    0.000    1.018    0.145 dimension.py:712(map)
        1    0.000    0.000    1.017    1.017 operation.py:146(__call__)
        1    0.000    0.000    0.998    0.998 datashader.py:416(_process)
        1    0.000    0.000    0.964    0.964 core.py:144(points)
        1    0.000    0.000    0.964    0.964 core.py:492(bypixel)
        5    0.000    0.000    0.897    0.179 local.py:405(get_async)
        3    0.000    0.000    0.897    0.299 threaded.py:33(get)
       24    0.000    0.000    0.894    0.037 local.py:150(queue_get)
       24    0.000    0.000    0.894    0.037 queue.py:147(get)
       12    0.000    0.000    0.893    0.074 threading.py:263(wait)
       96    0.893    0.009    0.893    0.009 {method 'acquire' of '_thread.lock' objects}
        1    0.066    0.066    0.491    0.491 utils.py:368(dshape_from_dask) <-----
      2/1    0.000    0.000    0.472    0.472 utils.py:51(__call__)
        1    0.000    0.000    0.471    0.471 dask.py:20(dask_pipeline)

You can see that almost half the time is spent in the dshape_from_dask function, and as you aggregate a smaller region this time dominates even more.

It would be great if this could be sped up in general but it might be sufficient to allow passing in the datashape in cases where you are doing a lot of aggregation on the same datasource (e.g. when generating a tileset) and want to avoid this 0.5 second penalty each time.

philippjfr avatar Aug 14 '18 16:08 philippjfr

Just hacked it locally and ran my tile generation code again, it's about 4x faster when I use the cached datashape instead of computing it each time.

philippjfr avatar Aug 14 '18 17:08 philippjfr

Cool! It would be great to have a PR allowing such caching.

I'm having trouble seeing why dshape_from_dask would be an expensive call:

def dshape_from_pandas_helper(col):
    """Return an object from datashape.coretypes given a column from a pandas
    dataframe.
    """
    ...

def dshape_from_pandas(df):
    return len(df) * datashape.Record([(k, dshape_from_pandas_helper(df[k]))
                                       for k in df.columns])

def dshape_from_dask(df):
    return datashape.var * dshape_from_pandas(df.head()).measure

Maybe X * datashape.Record is making X copies of something? It doesn't look like it from the repr, but I'm not sure:

>>> 3*datashape.Record([[int,int]])
dshape("3 * {"<class 'int'>": <class 'int'>}")

ETA: @jcrist verifies that datashape should indeed just be storing the number of copies, not duplicating anything that many times, so that's not the issue.

jbednar avatar Aug 14 '18 17:08 jbednar

@jcrist suggests that you might change dshape_from dask in this way:

def dshape_from_dask(df):
    """Return a datashape.DataShape object given a dask dataframe."""
#    return datashape.var * dshape_from_pandas(df.head()).measure
    return datashape.var * dshape_from_pandas(df._meta).measure

If you are working out of core, this will definitely be an improvement, because it replaces something that needs to load the data into memory (df.head()) with something that doesn't (df._meta, which wasn't available when datashader was first written). I think for the census data you're working with data that's been .persist()ed, so I wouldn't have thought this would add up to any significant time, but even then it's worth a try. Can you see if that affects it?

jbednar avatar Aug 14 '18 17:08 jbednar

Okay I've now done some more investigation and it seems to me this large bottleneck is down to this optimization in by_pixel:

# Avoid datashape.Categorical instantiation bottleneck
# by only retaining the necessary columns:
# https://github.com/bokeh/datashader/issues/396
if categorical_in_dtypes(source.dtypes.values):
    # Preserve column ordering without duplicates
    cols_to_keep = OrderedDict({col: False for col in source.columns})
    cols_to_keep[glyph.x] = True
    cols_to_keep[glyph.y] = True
    if hasattr(glyph, 'z'):
        cols_to_keep[glyph.z] = True
    if hasattr(agg, 'values'):
        for subagg in agg.values:
            if subagg.column is not None:
                cols_to_keep[subagg.column] = True
    elif agg.column is not None:
        cols_to_keep[agg.column] = True
    src = source[[col for col, keepit in cols_to_keep.items() if keepit]]
else:
    src = source

Dropping the unused categorical types here results in significant overhead which accounts for the 500ms, because computing the datashape is indeed fast. Making the suggested change (i.e. dshape_from_pandas(df._meta).measure), does seem to avoid this overhead entirely however so I'll make a PR with that change shortly.

philippjfr avatar Aug 14 '18 18:08 philippjfr

Using dshape_from_pandas(df._meta).measure) makes the first aggregation in census.ipynb almost 3X faster for me, which is very nice. But the categorical case create_image(*USA) fails then with KeyError: '__UNKNOWN_CATEGORIES__'; perhaps _meta doesn't include the actual categories?

jbednar avatar Aug 14 '18 20:08 jbednar

Oh, I see Philipp is also reporting that in #634. @jcrist said he can suggest something.

jbednar avatar Aug 14 '18 20:08 jbednar

Do we have any specific cases where the optimization above actually saves time? In every instance I have tried it's at least partially responsible for the slowdown.

philippjfr avatar Aug 14 '18 20:08 philippjfr

I see an example in https://github.com/bokeh/datashader/issues/396 now, will see if I can reproduce any speedup.

philippjfr avatar Aug 14 '18 20:08 philippjfr

Right, I was going to suggest replicating the ones in #396. But I'm a bit confused -- can we perhaps bypass all these issues by always just making a new dataframe that has only the columns we are actually using, so that we aren't checking any others? I thought that making a new such dataframe is very cheap, so why not just always do that? So I guess my suggestion is, can we simply delete the test "if categorical_in_dtypes(source.dtypes.values)"?

jbednar avatar Aug 14 '18 20:08 jbednar

can we perhaps bypass all these issues by just making a new dataframe that has only the columns we are actually using, so that we aren't checking any others?

That's pretty much what the optimization is doing (but only dropping unused categorical dtype columns) and it's exactly what's causing the slowdown in the datashape discovery. Not sure why though.

philippjfr avatar Aug 14 '18 20:08 philippjfr

Sorry -- just edited my message above to clarify -- I mean to suggest avoiding doing the test at all, always doing what's inside the "if".

jbednar avatar Aug 14 '18 20:08 jbednar

Also just edited mine, but to summarize the bit inside the if is exactly what's causing the slowdown. So always doing it would be bad, unless we can figure out why datatype discovery slows down so much when columns are dropped.

philippjfr avatar Aug 14 '18 20:08 philippjfr

Just to be very clear, this is the code:

    if categorical_in_dtypes(source.dtypes.values):
        # Preserve column ordering without duplicates
        cols_to_keep = OrderedDict({col: False for col in source.columns})
        cols_to_keep[glyph.x] = True
        cols_to_keep[glyph.y] = True
        if hasattr(glyph, 'z'):
            cols_to_keep[glyph.z] = True
        if hasattr(agg, 'values'):
            for subagg in agg.values:
                if subagg.column is not None:
                    cols_to_keep[subagg.column] = True
        elif agg.column is not None:
            cols_to_keep[agg.column] = True
        src = source[[col for col, keepit in cols_to_keep.items() if keepit]]
    else:
        src = source

I tried replacing the first line with "if True" and got a 2x speedup for both the single-aggregate and the categorical aggregate in census.ipynb. That suggests that when there are categoricals, half the time is spent just detecting them, right?

The question is then what happens when there are no categoricals and the else would have been activated. At least on the one example (clifford_attractor) that I had open, the time in that case also went down, by 20% or so, presumably the cost of the if statement. So can we not simply remove if categorical_in_dtypes(source.dtypes.values): and else: src=source?

jbednar avatar Aug 14 '18 21:08 jbednar

I tried replacing the first line with "if True" and got a 2x speedup for both the single-aggregate and the categorical aggregate in census.ipynb. That suggests that when there are categoricals, half the time is spent just detecting them, right?

I observed exactly the opposite pattern, if I change it to if False I get a 2x speedup, at least in the count_cat case and that saving seems to come from dshape_from_dask being much faster.

philippjfr avatar Aug 14 '18 21:08 philippjfr

How is that the opposite pattern? Seems like the same thing, in that I thought "categorical_in_dtypes(source.dtypes.values)" is the expensive bit, and both if True and if False avoid that bit. if True also ensures that we only deal with categoricals we actually use, so if we can always use (effectively) if True it seems we'll avoid the issue that #396 was addressing as well as the one you are raising.

jbednar avatar Aug 14 '18 21:08 jbednar

Because the profiler tells me it's dshape_from_dask that's faster and categorical_in_dtypes is very cheap whatever I do.

philippjfr avatar Aug 14 '18 21:08 philippjfr

When I use if True it consistently takes 1.2 seconds to aggregate, with if False I consistently get 0.55 seconds.

philippjfr avatar Aug 14 '18 21:08 philippjfr

Now I'm hopelessly confused; dshape_from_dask isn't even involved in that if statement; it's called regardless of any branches in that.

jbednar avatar Aug 14 '18 21:08 jbednar

Now I'm hopelessly confused; dshape_from_dask isn't even involved in that if statement; it's called regardless of any branches in that.

Yes, but that if branch creates a new dask dataframe with some columns dropped, which seems to cause some slowdown in the datashape detection.

philippjfr avatar Aug 14 '18 21:08 philippjfr

Definitely confused. I thought you were saying that the optimization was the issue, and so I tried both True and False and the original version of the optimization, with not much clear effect:

                if False:               if True:            if categorical_in_dtypes:
census-non-cat: 1.10  1.16  0.89 0.90   1.29  1.18  1.33    1.36 1.19 1.12
census-cat:     1.21  1.22  1.30 1.43   1.83  1.61  1.86    1.55 1.80 1.68

I'm on battery power, which isn't great for doing benchmarking, so maybe my results differ from yours. But at least in this, all of these are in the same ballpark for the first line (the monochrome non-categorical aggregation). So I'm not seeing much effect from removing this optimization, and moreover none of it seems to have anything to do with dshape_from_dask.

jbednar avatar Aug 14 '18 21:08 jbednar

Here are my results (in ms):

                 if True:      if False:     if categorical_in_dtypes:
census-non-cat:  831 840 840   404 413 450   852 833 872
census-cat:      947 968 966   504 471 485   946 983 947

philippjfr avatar Aug 14 '18 22:08 philippjfr

Perhaps we should compare dask, pandas, numba and datashape versions:

import pandas as pd
import dask
import numba
import datashape
> print(numba.__version__)
0.37.0dev1+123.gd3f0f8a
> print(dask.__version__)
0.18.0
> print(pd.__version__)
0.23.1
> print(datashape.__version__)
0.5.4

philippjfr avatar Aug 14 '18 22:08 philippjfr

And here's the (truncated) output of %%prun:

if True:
      
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.055    0.055    0.636    0.636 utils.py:368(dshape_from_dask)

if False:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.002    0.002 utils.py:368(dshape_from_dask)

philippjfr avatar Aug 14 '18 22:08 philippjfr

import pandas as pd
import dask
import numba
import datashape
> print(numba.__version__)
0.39.0
> print(dask.__version__)
0.18.2
> print(pd.__version__)
0.22.0
> print(datashape.__version__)
0.5.4

jbednar avatar Aug 14 '18 23:08 jbednar

All of this is a bit crazy; the only thing we get out of dshape_from_dask is {easting: float32, northing: float32} (the result of dshape.measure). Taking half a second to figure that out seems scandalous.

jbednar avatar Aug 14 '18 23:08 jbednar

I've updated my pandas version to 0.23.4 now.

jbednar avatar Aug 15 '18 00:08 jbednar

Taking half a second to figure that out seems scandalous.

Remember dask defers execution, I'm almost certain it's the dropping of columns that's causing this when df.head() is called.

philippjfr avatar Aug 15 '18 00:08 philippjfr

I'm going to have to give up on figuring this out for today, but it does seem like a bunch of wasted time somewhere.

jbednar avatar Aug 15 '18 00:08 jbednar

I just tested the example in #396 and my conclusion is that the optimization is consistently slower in every case when using dask, but does speed things up when using pandas. So I think we should simply disable the "optimization" in the dask case.

philippjfr avatar Aug 15 '18 00:08 philippjfr

Ok, thanks. Repurpose #634, I guess. But that seems like a problem to report for dask; using a subset of the columns on a dataframe is a pretty common thing to do...

jbednar avatar Aug 15 '18 01:08 jbednar