spark icon indicating copy to clipboard operation
spark copied to clipboard

Adding dropTake to RDD. Allows to drop first 'drop' elements and then take next 'take' elements

Open trulite opened this issue 11 years ago • 15 comments

  1. Added a dropTake(drop: Int, num: Int) method which will drop first 'drop' elements and then take the next 'num' elements.
  2. Added a test for take(num: Int)
  3. Added a test for dropTake(drop: Int, num: Int)

trulite avatar May 19 '13 15:05 trulite

Thank you for your pull request. An admin will review this request soon.

AmplabJenkins avatar May 19 '13 15:05 AmplabJenkins

Since i am not very familiar with usage of accumulator, I will let someone else comment ! (On face of it, it should work unless I am missing some subtlety).

mridulm avatar May 19 '13 19:05 mridulm

I think the current test only exercises one of the branches in

 val taken = if (leftToDrop > 0) it.take(0) else it.take(left)

To cover more cases, I'd add a test where we drop one or more complete partitions before taking results from the remaining partitions.

JoshRosen avatar May 19 '13 20:05 JoshRosen

I added these tests Given 1..16 values with 4 partitions

  1. drop none
  2. drop only from first partition. take 2
  3. drop (4+2) all 4 from first and 2 from second partition. take 2.
  4. drop(4+4+2) all 4 from first and second and 2 from third partition and take 6 values. The take should spill over to the next partition

trulite avatar May 20 '13 12:05 trulite

Ping!

trulite avatar Jun 23 '13 01:06 trulite

Sorry for taking a while to get to this. I'm curious, what do you want to use this operation for? It seems potentially limited in use because it's going to scan the whole RDD linearly.

mateiz avatar Jun 23 '13 17:06 mateiz

This provides the LIMIT and TOPK results like in SQL/LINQ. Sometimes it is useful to page to just 2-3 pages(of some page size) of the TOPK results. This will not fetch the whole RDD like collect does.

trulite avatar Jun 23 '13 23:06 trulite

A) Instead of iteratively dropping, it would be better to :

a) Count records per partition b) Compute records to drop per partition, and records to keep per partition (at master - this per partition dataset is small). c) Apply a filter per partition based on (b) to generate a new RDD.

Ofcourse, this assumes that :

  1. in normal case, you touch more than one partition.
  2. The amount of data after the dropTake needs to be an RDD - particularly when it is too large to be collected at master.

B) Above will also get rid of the need to do 'collect().drop(drop)' as mentioned in comment : since that will also not scale with size of RDD.

mridulm avatar Jun 24 '13 03:06 mridulm

A) a) Isn't counting itself a linear operation?

  1. Actually in my use case it is likely that I touch only one partition. But yes this may well be more than one partition.
  2. Usually when doing limit and take, elements to take is usually a small number. This operation is similar to 'take' except we allow the user to skip certain number of pages in the result.

trulite avatar Jun 24 '13 11:06 trulite

  1. Let me explain it this way :

Assuming "T" seconds to process a partition (for simplicity sake, count/drop/etc all take same time - though it is not accurate) - in this PR, if you need to process N partitions, you take N * T seconds. While in my proposal, you will take max of about 2 * T * P seconds - where P is num_partitions/num_cores : ideally, K should be 1 (Slightly gross simplification for illustration purpose - data locality, etc come into play : hope you get the idea :-) )

Though count is linear - all partitions get scheduled in parallel : so unless you are running on a single core or number of partitions are much higher than number of cores, you wont take num_partitions * time_per_partition. Ideally, the cost of distributed count == cost of single partitions count : so agnostic to number of partitions for reasonable counts.

Btw, we are not doing a global count - but count per partition.

  1. Specifically for some requirements, I agree - it would be feasible solution : and probably is the ideal solution for small values of offset and count ! To the point where, I think even a general solution should keep this around to optimize in case the values are small enough (if we estimate that if we touch only a small number of partitions. num_touched <<< num_partitions even if num_touched >> 1).

But as a general construct to RDD, which might get used for all usecases, I am worried about the performance implications - particularly ability to cause OOM and bad scaling - when used on large datasets.

mridulm avatar Jun 24 '13 12:06 mridulm

Thanks! I understand and agree. I will whip up something along these lines soon and update this PR.

trulite avatar Jun 24 '13 12:06 trulite

If your use case is paging with small values of "drop" and "take", then consider just calling take() on the first few thousand elements and paging through that as a local collection. Otherwise this seems like a very specific thing to add for just one use case.

A parallel slicing operation like Mridul says could also be good, but again I'd like to understand where it will be used, because it seems like a complicated operation to add and to implement well. Just as an example, the counts there might be better off being cached across calls to dropTake if it will be used for paging.

mateiz avatar Jun 24 '13 13:06 mateiz

Thank you for your pull request. An admin will review this request soon.

AmplabJenkins avatar Aug 05 '13 21:08 AmplabJenkins

Any movement on this request..? I could use this.

DomingoExabeam avatar Sep 20 '13 00:09 DomingoExabeam

I was whipping up a generic in tool in which I needed to allow the user to page on top-K elements. Since a generic tool is a one-off use-case I am using my solution(this PR as of the last update). Since this is human user driven, they will only do this for a small number of pages, so the solution I have works better for me than the one Mridul suggested with parallel slicing.

I agree with Mathei that this might be dangerous to have around as a generic RDD construct.

That said, I have experimented with Mridul's approach and might have the code lying around. So if we intend to have this PR pulled I will update the code and test cases.

trulite avatar Sep 20 '13 10:09 trulite