spark
spark copied to clipboard
Made combOp of aggregate() work as a reduce instead of an implicit fold
Thank you for your pull request. An admin will review this request soon.
A fun little demo aggregating an RDD[Char] into a String:
MASTER=local[4] ./spark-shell
.
.
.
scala> val l = sc.parallelize("foobarbazquux".toList, 4)
l: spark.RDD[Char] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> l.aggregate("_")((s, c) => s :+ c, (s1, s2) => s1 + s2)
.
.
.
res0: java.lang.String = _bar_quux_baz_foo
Run the aggregate multiple times, and you'll see different permutations of foo, bar, baz and quux, but always maintaining the ordering within each partition and with only one leading zeroValue ("_") where the old implementation would have two.
Mark, this seems inconsistent with Scala Collections' aggregate. When you call their aggregate on an empty list, you still get the zeroValue, whereas with this, wouldn't you get a NoSuchElementException?
Alternatively, if this does return the zeroValue because each partition's aggregate will pass that, isn't it the same as the current version?
Just to show what I mean:
scala> List(1,2,3).aggregate(new ArrayBuffer[Int])(_ += _, _ ++= _)
res1: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(1, 2, 3)
scala> List().aggregate(new ArrayBuffer[Int])(_ += _, _ ++= _)
res2: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer()
The zeroValue is not bound into combOp (which operation potentially needs a different zero than does seqOp), but the zeroElement is still part of the seqOp fold, so there won't be any NoSuchElementException. The results from my aggregate() using your example inputs are exactly the same as for Scala collections.
The difference is that the zeroElement doesn't become the initial element of an implicit fold in combOp. Instead, the first call of optCombOp (when jobResult is still None) effectively becomes the identity function -- i.e. the same as Option(zeroElementForCombOp combOp u) without us having to know or infer what the correct zero is for combOp. (If U couldn't be an AnyVal, then you could avoid using Option and do something similar by initializing jobResult to null and putting an if (jobResult == null) return taskResult else ... at the beginning of combOp(jobResult, taskResult).
For sane uses of aggregate, this shouldn't make any difference; but you can see differences in "creative" uses of aggregate. For example, sc.parallelize(List[Char](), 4).aggregate("*")(_ :+ _, _ + _) will result in **** for my aggregate, whereas the existing aggregate will produce ***** since an additional copy of the zeroElement is currently inserted into combOp. And, in the (_ + _ , _ *_) example that started off this whole discussion, the current implementation will wipe out the summed results by inserting 0 * _ at the beginning of the combOp fold-that-should-be-a-reduce.
But I don't understand why we'd want to support a combOp that's incompatible with the seqOp. Why should the user have to worry about the number of partitions? If you look at Scala's aggregate, it also has inconsistent behavior if you give + as one operation and * as the other:
scala> collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(1)
scala> List(0,1,2).par.aggregate(0)(_ + _, _ * _)
res20: Int = 3
scala> collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(2)
scala> List(0,1,2).par.aggregate(0)(_ + _, _ * _)
res22: Int = 0
I think this just means that you're not expected to use aggregate with such incompatible operations. More precisely, the operations passed should be such that they'll give the same result regardless what the partitions are. Maybe I'm missing something, but I don't think we should complicate the implementation to change the result in this arguably incorrect use case. If a user really wants to be aware of partitions, they can always use mapPartitions anyway.
Yes, I'm not sure that there is a legitimate use case for seqOp and combOp that don't share the same zero. If we're willing to say that combOp must be commutative, associative and have the same zero as seqOp, then there is no harm in inserting an extra zeroElement into the combOp reduce, and this pull request is needless complication (although a documentation update would be in order.) On the other hand, if we don't want to be that restrictive on combOp and can see a use for incompatible operations when users have grasped control of the relevant details of partitioning (e.g. with a custom partitioner, coalesce or one of the operations that supports a numPartitions parameter), then we really should implement combOp as a reduce, as the docs claim.
I will agree that Scala does generate some really weird results when incompatible operations are supplied to aggregate over parallel collections. I haven't looked yet at how that aggregate is implemented, but it's not obvious to me what it is doing just from looking at various inputs and results.
My (intended) real point in the mailing list discussion was that its behaviour is somewhat inconsistent with its documentation. If it's behaviour is desired (and that is arguably the case), then perhaps the docs should simply be updated to indicate that the operations should be consistent (as Mark pointed out both associative and commutative) to avoid strange behaviour. If any user really wants to use two "inconsistent" operations they do have available mapPartitions and a reduce (or whatever).
Yeah, good point. In that case I'd vote to just update the docs for this but leave it the way it was. One other difference I noticed with fold vs. reduce is that fold will also work on RDDs with zero partitions (which can actually happen sometimes with HDFS files if you give an empty directory), while reduce won't.
Thank you for your pull request. An admin will review this request soon.