spark
spark copied to clipboard
Adding dropTake to RDD. Allows to drop first 'drop' elements and then take next 'take' elements
- Added a dropTake(drop: Int, num: Int) method which will drop first 'drop' elements and then take the next 'num' elements.
- Added a test for take(num: Int)
- Added a test for dropTake(drop: Int, num: Int)
Thank you for your pull request. An admin will review this request soon.
Since i am not very familiar with usage of accumulator, I will let someone else comment ! (On face of it, it should work unless I am missing some subtlety).
I think the current test only exercises one of the branches in
val taken = if (leftToDrop > 0) it.take(0) else it.take(left)
To cover more cases, I'd add a test where we drop one or more complete partitions before taking results from the remaining partitions.
I added these tests Given 1..16 values with 4 partitions
- drop none
- drop only from first partition. take 2
- drop (4+2) all 4 from first and 2 from second partition. take 2.
- drop(4+4+2) all 4 from first and second and 2 from third partition and take 6 values. The take should spill over to the next partition
Ping!
Sorry for taking a while to get to this. I'm curious, what do you want to use this operation for? It seems potentially limited in use because it's going to scan the whole RDD linearly.
This provides the LIMIT and TOPK results like in SQL/LINQ. Sometimes it is useful to page to just 2-3 pages(of some page size) of the TOPK results. This will not fetch the whole RDD like collect does.
A) Instead of iteratively dropping, it would be better to :
a) Count records per partition b) Compute records to drop per partition, and records to keep per partition (at master - this per partition dataset is small). c) Apply a filter per partition based on (b) to generate a new RDD.
Ofcourse, this assumes that :
- in normal case, you touch more than one partition.
- The amount of data after the dropTake needs to be an RDD - particularly when it is too large to be collected at master.
B) Above will also get rid of the need to do 'collect().drop(drop)' as mentioned in comment : since that will also not scale with size of RDD.
A) a) Isn't counting itself a linear operation?
- Actually in my use case it is likely that I touch only one partition. But yes this may well be more than one partition.
- Usually when doing limit and take, elements to take is usually a small number. This operation is similar to 'take' except we allow the user to skip certain number of pages in the result.
- Let me explain it this way :
Assuming "T" seconds to process a partition (for simplicity sake, count/drop/etc all take same time - though it is not accurate) - in this PR, if you need to process N partitions, you take N * T seconds. While in my proposal, you will take max of about 2 * T * P seconds - where P is num_partitions/num_cores : ideally, K should be 1 (Slightly gross simplification for illustration purpose - data locality, etc come into play : hope you get the idea :-) )
Though count is linear - all partitions get scheduled in parallel : so unless you are running on a single core or number of partitions are much higher than number of cores, you wont take num_partitions * time_per_partition. Ideally, the cost of distributed count == cost of single partitions count : so agnostic to number of partitions for reasonable counts.
Btw, we are not doing a global count - but count per partition.
- Specifically for some requirements, I agree - it would be feasible solution : and probably is the ideal solution for small values of offset and count ! To the point where, I think even a general solution should keep this around to optimize in case the values are small enough (if we estimate that if we touch only a small number of partitions. num_touched <<< num_partitions even if num_touched >> 1).
But as a general construct to RDD, which might get used for all usecases, I am worried about the performance implications - particularly ability to cause OOM and bad scaling - when used on large datasets.
Thanks! I understand and agree. I will whip up something along these lines soon and update this PR.
If your use case is paging with small values of "drop" and "take", then consider just calling take() on the first few thousand elements and paging through that as a local collection. Otherwise this seems like a very specific thing to add for just one use case.
A parallel slicing operation like Mridul says could also be good, but again I'd like to understand where it will be used, because it seems like a complicated operation to add and to implement well. Just as an example, the counts there might be better off being cached across calls to dropTake if it will be used for paging.
Thank you for your pull request. An admin will review this request soon.
Any movement on this request..? I could use this.
I was whipping up a generic in tool in which I needed to allow the user to page on top-K elements. Since a generic tool is a one-off use-case I am using my solution(this PR as of the last update). Since this is human user driven, they will only do this for a small number of pages, so the solution I have works better for me than the one Mridul suggested with parallel slicing.
I agree with Mathei that this might be dangerous to have around as a generic RDD construct.
That said, I have experimented with Mridul's approach and might have the code lying around. So if we intend to have this PR pulled I will update the code and test cases.