fugue icon indicating copy to clipboard operation
fugue copied to clipboard

[FEATURE] Spark Take 1 row without sorting optimization

Open goodwanghan opened this issue 2 years ago • 2 comments

Is your feature request related to a problem? Please describe. Look at here

If taking just one row with our sorting, we may use GROUP BY and FIRST to solve this problem, it can be a lot faster. Let's add this special handling.

goodwanghan avatar Nov 02 '21 17:11 goodwanghan

@kvnkho @goodwanghan is this issue taken? Would love to jump on this. Any pointers would be awesome

satya-nutella avatar Jul 28 '22 19:07 satya-nutella

Hi @meehawk , good to see you in the Fugue repo this time! Thanks for the interest. I still remember you from the Prefect repo. This issue is not taken and you can take a stab at it. Here is some initial guidance.

There is a WorkflowDataFrame class over here. So this is the DAG that Fugue creates to express a computation workflow. The DAG then pushes down the methods to the underlying execution engine (Pandas, Spark, or Dask). So in the same file, you will find a take() method. This calls the Take Processor. This is a generic abstraction for Pandas take, Spark take, and Dask take. So notice the last line calls the take of the underlying execution engine.

You can find the implementations of take attached to each execution engine.

  • Pandas - https://github.com/fugue-project/fugue/blob/838fdaa794c62e8bdc7f1474818d9491d5d39ed7/fugue/execution/native_execution_engine.py#L320-L354
  • Spark - https://github.com/fugue-project/fugue/blob/838fdaa794c62e8bdc7f1474818d9491d5d39ed7/fugue_spark/execution_engine.py#L471-L530
  • Dask - https://github.com/fugue-project/fugue/blob/838fdaa794c62e8bdc7f1474818d9491d5d39ed7/fugue_dask/execution_engine.py#L375-L429

So this specific issue is an optimization for the Spark code. But to know the issue, you need to know the take behavior. Fugue has a unified test suite for all the backends so that users get the same behavior for whatever backend they choose. You can find the tests for take here and it also shows you how to run it. The engine there is the execution engine. For example, for Spark it's this class.

The important thing to know for take as you can see in the tests is that if no partition is specified, it just returns n rows. If a partition is specified, it returns n rows for EACH partition.

This issue proposes special handling to use the native Spark groupby().first() if n is 1 because this will avoid the need to sort each group. An example of this function can be found here. It is relatively newer (PySpark 3.1). This would avoid the sorting logic this take method currently uses. I think you also need to check if the partition has a presort, and if it's ascending use first, and if descending, use last. You can read more about Fugue's partitions here if you are interested, but the gist is you partition by key, and also specify a presort if wanted.

This may be a lot to digest, feel free to ping us in the Slack channel at slack.fugue.ai also and we can discuss this more.

kvnkho avatar Jul 29 '22 03:07 kvnkho