modin icon indicating copy to clipboard operation
modin copied to clipboard

FIX-#4597: Refactor Partition handling of func, args, kwargs

Open noloerino opened this issue 2 years ago • 3 comments

What do these changes do?

(picking up from #4600)

This introduces the Invocable object, which wraps tuples of func, *args, **kwargs that were previously being passed around the codebase, and led to possible mixing of modin-internal arguments with API-level ones.

  • [x] commit message follows format outlined here
  • [x] passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • [x] passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • [x] signed commit with git commit -s
  • [x] Resolves #4597
  • [ ] tests added and passing
  • [x] module layout described at docs/development/architecture.rst is up-to-date
  • [x] added (Issue Number: PR title (PR Number)) and github username to release notes for next major release

noloerino avatar Jul 25 '22 20:07 noloerino

Codecov Report

Merging #4715 (8808834) into master (d071b43) will increase coverage by 4.73%. The diff coverage is 100.00%.

@@            Coverage Diff             @@
##           master    #4715      +/-   ##
==========================================
+ Coverage   84.91%   89.64%   +4.73%     
==========================================
  Files         267      268       +1     
  Lines       19740    20027     +287     
==========================================
+ Hits        16762    17953    +1191     
+ Misses       2978     2074     -904     
Impacted Files Coverage Δ
...core/dataframe/base/partitioning/axis_partition.py 100.00% <ø> (ø)
...ns/pandas_on_ray/partitioning/virtual_partition.py 91.17% <ø> (-0.26%) :arrow_down:
...n/core/io/column_stores/column_store_dispatcher.py 96.00% <ø> (ø)
modin/core/io/column_stores/parquet_dispatcher.py 96.25% <ø> (+2.08%) :arrow_up:
modin/core/io/pickle/pickle_dispatcher.py 92.30% <ø> (ø)
modin/core/io/sql/sql_dispatcher.py 100.00% <ø> (ø)
modin/core/io/text/csv_glob_dispatcher.py 80.99% <ø> (ø)
modin/core/io/text/excel_dispatcher.py 94.01% <ø> (+0.85%) :arrow_up:
modin/core/io/text/json_dispatcher.py 97.67% <ø> (ø)
modin/core/io/text/text_file_dispatcher.py 97.42% <ø> (ø)
... and 56 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov[bot] avatar Jul 25 '22 21:07 codecov[bot]

This pull request introduces 1 alert when merging 4985e007afe6997b5d88b0fa6276f3a3e056d116 into cfafbb254c221dd4f739a9cf5af17c9e8cdf13c3 - view on LGTM.com

new alerts:

  • 1 for Unused import

lgtm-com[bot] avatar Jul 27 '22 22:07 lgtm-com[bot]

Also, I was wondering if we could do preprocessing (ray.put. client.scatter) for the arguments to be passed in a ray/dask remote function? That may drastically increase performance.

From what I ran into while testing, there are some points in the codebase where the arguments are already Ray object IDs/Dask futures when they're passed into a function. I can try investigating further to see if this can be made uniform, perhaps such that Invocable or some other wrapper function is always called with concrete Python objects as arguments, and then calls ray.put/client.scatter within its constructor.

noloerino avatar Aug 09 '22 23:08 noloerino

As to the performance penalties I said earlier, you can see that in the following simple example.

if __name__ == "__main__":
    import modin.pandas as pd
    import numpy as np
    from timeit import default_timer as dt

    data = np.random.randint(0, 100, size=(2**16, 2**12))
    df = pd.DataFrame(data)
    start = dt()
    result = df.abs()
    end = dt()
    print("time =", end - start)

Ray, master time = 0.024973500025225803

Ray, PR4715 time = 0.047648999985540286

Dask, master time = 0.4006148000189569

Dask, PR4715 time = 1.6876778000150807

I ran it in my laptop with 8 cores available.

YarShev avatar Aug 10 '22 11:08 YarShev

Also, I was wondering if we could do preprocessing (ray.put. client.scatter) for the arguments to be passed in a ray/dask remote function? That may drastically increase performance.

From what I ran into while testing, there are some points in the codebase where the arguments are already Ray object IDs/Dask futures when they're passed into a function. I can try investigating further to see if this can be made uniform, perhaps such that Invocable or some other wrapper function is always called with concrete Python objects as arguments, and then calls ray.put/client.scatter within its constructor.

Please look at my comment first here.

Yes, some of the arguments can already be futures. We should probably go through *args and **kwargs, find non-future objects and make those futures.

YarShev avatar Aug 10 '22 11:08 YarShev

Thanks for the pointers. I've removed the Invocable class and instead replaced it with a type alias for a 3-tuple of (func, args, kwargs). Here's the speeds:

Ray, master: 0.009s Dask, master: 0.129s Ray, old PR version: 0.012s Dask, old PR version: 0.537s Ray, latest PR version: 0.011s Dask, latest PR version: 0.132s

I'll look into how easy it is to call ray.put/client.scatter over args and kwargs, and see if it's simple enough to include in this PR.

noloerino avatar Aug 10 '22 21:08 noloerino

This pull request introduces 1 alert when merging 8302bb44f37b04be20300d08ebb39c0d3f73c60c into 3f985ed6864cc1b5b587094d75ca5b2695e4139f - view on LGTM.com

new alerts:

  • 1 for Unused import

lgtm-com[bot] avatar Aug 10 '22 22:08 lgtm-com[bot]

It seems that CI is still failing because when Dask encounters a nested tuple, where functions are embedded deeper inside the tuple, it seems to try to evaluate the functions in a depth-first fashion. Here's a simple example of what I mean:

from distributed import Client

c = Client()

def f1(f, arg):
    print("in f1")
    return f(arg)

def f2(arg):
    print("in f2")
    return arg + 1

future = c.submit(f1, f2, 100) # passes
print(future)
print(c.gather(future))
print("passed")

future = c.submit(f1, (f2, 100)) # crashes
print(future)

print(c.gather(future))

In the first configuration, where f1, f2, 100 are the arguments passed to c.submit, everything passes fine; f1 is properly called with f2, 100 as its arguments. In the second, where the arguments are instead f1, (f2, 100), Dask attempts to evaluate f2() before f1.

stdout/stderr for evidence:

in f1
in f2
101
passed

in f2
2022-08-10 18:44:09,542 - distributed.worker - WARNING - Compute Failed
Key:       f1-db5fb5becdfd2e9b2edf6ef903a81100
Function:  execute_task
args:      ((, (, 100)))
kwargs:    {}
Exception: 'TypeError("f1() missing 1 required positional argument: \'arg\'")'

Traceback (most recent call last): File "", line 21, in File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/distributed/client.py", line 2210, in gather return self.sync( File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/distributed/utils.py", line 338, in sync return sync( File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/distributed/utils.py", line 405, in sync raise exc.with_traceback(tb) File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/distributed/utils.py", line 378, in f result = yield future File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/tornado/gen.py", line 762, in run value = future.result() File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/distributed/client.py", line 2073, in _gather raise exception.with_traceback(traceback) TypeError: f1() missing 1 required positional argument: 'arg'

I'm not familiar enough with Dask to say whether or not this is a bug on Dask's end, since I think it's reasonable for the scheduler to try to evaluate depth-first, although this behavior is somewhat counterintuitive. It looks like we may have to go back to @jeffreykennethli 's original suggestion of just leaving the Dask call queue of flattened tuples.

Another really messy (and possibly confusing) way is to not compose call_queue with Invocables, and instead with the original (func, args, kwargs). That means we go from (func, args, kwargs) in the API layer, convert to Invocable when passing them around the partition layer, and unpack them when dealing with the execution layer, which seems bad. Again, all of this is to make the partition layer cleaner, at the cost of... making the partition layer complex again.

noloerino avatar Aug 11 '22 01:08 noloerino

@noloerino, please note that you are getting the error because f1 didn't get enough arguments. The following message says about that - Exception: 'TypeError("f1() missing 1 required positional argument: \'arg\'")'. You should unwrap the tuple with *(f2, 100) to get this worked.

YarShev avatar Aug 11 '22 11:08 YarShev

Also, note that Dask may not work with tuples passed in to a remote function. That's why we have used a list for the call queue so far.

YarShev avatar Aug 11 '22 11:08 YarShev

@YarShev, you're right, I set up my minimized example somewhat incorrectly, and I would have needed to * the tuple. That said, I think it still proves my point about Dask's evaluation order being different, as the in f2 print statement is hit before f1 is even called, which seems to imply that Dask is depth-first evaluating functions within tuples. As you say, this doesn't seem to be the case with lists, so I've now returned call_queue to using lists rather than tuples.

noloerino avatar Aug 15 '22 23:08 noloerino

the in f2 print statement is hit before f1 is even called, which seems to imply that Dask is depth-first evaluating functions within tuples

I don't see this behavior. My output is in f1 first and in f2 next.

YarShev avatar Aug 16 '22 11:08 YarShev

What about this example?

from distributed import Client

c = Client()

def f1(arg):
    print("in f1, received", arg)

def f2(arg):
    print("in f2, received", arg)
    return arg + 1

future = c.submit(f1, (f2, 100))

My output:

in f2, received 100
in f1, received 101

Environment info, in case it's platform-specific:

  • MacOS Monterey (12.4) on Apple Silicon
  • Python 3.10.4
  • Dask 2022.8.0

noloerino avatar Aug 16 '22 15:08 noloerino

Updated benchmarks on the above df.abs() code (avg of 10 runs for each):

  • Ray, master (fd225c52): 0.014s
  • Dask, master: 0.329s
  • Ray, latest PR version: 0.017s
  • Dask, latest PR version: 0.267s

It seems like some recent change to master has caused a slowdown with the Dask backend (compared to my numbers from last week). There also was a sizable variance on my runs on master, ranging from 0.208 to 0.544 seconds per run, but regardless it seems like this PR no longer causes a large performance hit.

noloerino avatar Aug 17 '22 16:08 noloerino

future = c.submit(f1, (f2, 100))

@noloerino, yes, now I see the same as you but I am not sure that is correct enough arguments pass to a remote function. Anyway, we understand that dask may not work with tuples.

YarShev avatar Aug 17 '22 18:08 YarShev

Updated benchmarks on the above df.abs() code (avg of 10 runs for each):

  • Ray, master (fd225c52): 0.014s
  • Dask, master: 0.329s
  • Ray, latest PR version: 0.017s
  • Dask, latest PR version: 0.267s

It seems like some recent change to master has caused a slowdown with the Dask backend (compared to my numbers from last week). There also was a sizable variance on my runs on master, ranging from 0.208 to 0.544 seconds per run, but regardless it seems like this PR no longer causes a large performance hit.

Can you also run a full axis operation?

YarShev avatar Aug 17 '22 18:08 YarShev

@YarShev here's benchmarks for the average time of df.describe() over 5 runs: Ray, master (fd225c52): 2.52s Dask, master: 2.86s Ray, latest PR version: 2.51s Dask, latest PR version: 2.89s

noloerino avatar Aug 17 '22 23:08 noloerino

I suggest getting rid of the Invocable stuff for now because I am still seeing the significant performance penalties on the workload I am currently profiling. You can find it in https://github.com/YarShev/daal4py/tree/nyc-taxi/examples/notebooks/NYCTaxi-E2E-Demo. The script is large_data.py, which is based on NYCTaxi-E2E-Large-Data-Optimized.ipynb.

112 CPUs master (ray) whole time = 383.454846611013 PR 4715 (ray) whole time = 701.9466575289844

YarShev avatar Aug 18 '22 08:08 YarShev

I did some more exploring, and I'm having issues reproducing your perf issues locally -- it seems like the NYC TLC has changed their data to all be Parquet files, and have removed lat/long information from their datasets. Running a modified (and smaller) version of your large_data.py on my laptop doesn't exhibit any slowdowns, though since you said your benchmark was on 112 cores, I would guess this is something I'd have to run on AWS to reproduce.

Regardless, I agree that removing the invocable abstraction might be the way to go for now, as I would guess the extra deserialize calls are the source of the slowdowns, and unlike Dask, Ray seems to not recursively resolve promises (ray.get([ray.put([ray.put(0)])]) returns [[ObjectRef(...)]] for me). I'll see if I can find a good way to preserve the separation of user args/kwargs without passing around an explicit Invocable object.

noloerino avatar Aug 23 '22 00:08 noloerino

@YarShev I've removed the extraneous deserialize calls, which should bring performance back to expected levels. Unfortunately I still had trouble running your benchmark code since I couldn't find the CSV dataset with lat/long information, so could you please try running things again on your end? If the slowdown is still around, I'll see if I can find a comparably large dataset to test against.

noloerino avatar Aug 25 '22 00:08 noloerino

This pull request introduces 1 alert when merging 6398fe61a326dfae855efd1bf823ee548ca4418e into dff84b9cdf373096f88356fe17202599c192c2fc - view on LGTM.com

new alerts:

  • 1 for Unused import

lgtm-com[bot] avatar Aug 25 '22 01:08 lgtm-com[bot]

@noloerino, please make CI green, after that I will run the benchmark in order not to encounter any trouble.

YarShev avatar Aug 25 '22 09:08 YarShev

Should be fixed now @YarShev, (the failing FuzzyData on Dask check seems to be unrelated, and is failing on current master for me locally).

noloerino avatar Aug 30 '22 03:08 noloerino

master whole time: 236.58521463605575

PR4715 whole time: 281.4897154511418

YarShev avatar Aug 30 '22 13:08 YarShev

Let me look into the changes.

YarShev avatar Aug 30 '22 13:08 YarShev

I've refactored deploy_ray_func, which used to rely on some implicit behavior of deserialize, to be a little less confusing, and I've moved the extraneous deserialize call previously pointed out to be more precise and occur on a worker node.

I tried looking at a few micro-benchmarks on an AWS t2.2xlarge (8 cores, 32GB RAM) instance, but I couldn't find any substantial perf differences between this branch, the previous version of this PR, and latest master on .groupby or repeated .apply calls, though I was able to observe that adding a few extra unnecessary deserialize calls did cause a large slowdown.

@YarShev assuming CI passes, please run your benchmark again to see if anything has changed. If performance continues to be an issue, it would be helpful if we could set up a short meeting some time next week so I can have a reliable benchmark that I can check performance issues with myself.

noloerino avatar Sep 07 '22 23:09 noloerino

Now the perf is on par with master on Ray engine, but there is a little slowdown on Dask engine.

YarShev avatar Sep 08 '22 11:09 YarShev

I've already rebased on latest master; Mahesh initiated a rerun of CI for me and it looks like Omnisci tests are passing.

noloerino avatar Sep 14 '22 21:09 noloerino

I think Mahesh re-ran some other stuff. This seems to be failing: https://github.com/modin-project/modin/actions/runs/3048755188/jobs/4914151049

pyrito avatar Sep 14 '22 21:09 pyrito

Thanks, I've gone through and cleaned up the docstrings.

noloerino avatar Sep 20 '22 16:09 noloerino