scalding
scalding copied to clipboard
group randomly is not balanced
Group randomly uses nextInt(numReducers), this means we get huge reducer skew
@ianoc-stripe Not sure I totally understand. Are you saying we should just use nextInt()
to distribute it evenly over the Integer
space?
Exactly! -- otherwise we are hashing the reduced integer space and then overload a few keys
I think this is a birthday problem issue. I think we want at least something like numReducers * numReducers
or something.
Is there any downside to the full integer space?
On Mon, Feb 13, 2017 at 11:43 AM, P. Oscar Boykin [email protected] wrote:
I think this is a birthday problem issue. I think we want at least something like numReducers * numReducers or something.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/twitter/scalding/issues/1639#issuecomment-279500272, or mute the thread https://github.com/notifications/unsubscribe-auth/AWCkRgNY7vTdSDfp4fV5oujuBzKx0MNUks5rcLJpgaJpZM4L_jVZ .
well, in the groups, they will have fewer entries. Where, ideally, we could apply a mapValueStream
to a bigger block of them for some use cases.
I think the ideal would be 1 group per reducer, but doing this randomly creates skew.
Fair, not sure how the distribution over reducers would look at n^2 or how common that use case is. If n^2 covers it might aswell use that for the bound
yeah, I'm not too uptight about it, but having fewer groups seems like it could be useful. Actually, I can't replicate the skew in a simple repl session:
scala> def bucket(n: Int, k: Int, recs: Int, seed: Int): Vector[Int] = {
| val rng = new java.util.Random(seed)
| val m = (0 until recs).iterator.map { _ => rng.nextInt(k) }.toList.groupBy(_ % n).mapValues(_.size)
| (0 until n).map(m(_)).toVector
| }
scala> def spread(vs: Vector[Int]) = vs.max - vs.min
spread: (vs: Vector[Int])Int
scala> spread(bucket(10, 10, 1000, 123))
res1: Int = 47
scala> spread(bucket(10, 10, 10000, 123))
res2: Int = 127
scala> spread(bucket(10, 100, 10000, 123))
res3: Int = 127
did I make some silly error?
Yep -- well your code relies on the fact that integer hash codes on the jvm are the identity.
def bucket(n: Int, k: Int, recs: Int, seed: Int): Vector[Int] = {
val rng = new java.util.Random(seed)
val m = (0 until recs).iterator.map { _ => val l = rng.nextInt(k); (l, l.toString) }.toList.groupBy(_.hashCode % n).mapValues(_.size)
(0 until n).map(s => m.getOrElse(s, 0)).toVector
}
scala> spread(bucket(10, 10, 10000, 123))
res3: Int = 2000
scala> spread(bucket(10, 10, 1000, 123))
res4: Int = 198
@ianoc @johanoskarsson @benpence
Been struggling with the same issue as well! Notice that some reducers get much more data than others (some reducers don't get any data at all!). Wondering if you guys have a better solution than TypedPipe.groupRandomly()
?
In my case, I have to make around 50K partitions using groupRandomly
to a very large TypedPipe.
Thanks in advance for your time!
if you look at groupRandomly(buckets)
it is basically the same as:
def groupRandomly[A](tp: TypedPipe[A], buckets: Int): Grouped[Int, A] = {
lazy val rng = new Random(123L)
tp.groupBy { a =>
val raw = rng.nextInt(buckets) + a.hashCode()
val mod = raw % buckets
if (mod >= 0) mod else mod + buckets
}
.withReducers(buckets)
}
so you could change it to anything you want, for instance, you can try this:
def groupSequentially[A](tp: TypedPipe[A], buckets: Int): Grouped[Int, A] = {
var idx = 0
tp.groupBy { _ =>
val mod = idx % buckets
idx += 1
if (mod >= 0) mod else mod + buckets
}
.withReducers(buckets)
}
which would do a round robin style from one to the next.
You could also just drop the modulus in the original example:
def groupRandomly[A](tp: TypedPipe[A], buckets: Int): Grouped[Int, A] = {
lazy val rng = new Random(123L)
tp
.groupBy { a => rng.nextInt() + a.hashCode() }
.withReducers(buckets)
}
Which I think is probably fine... I'm not really sure the motivation for the modulus, but we have been reluctant to make a change...
I think the bug here probably should be fixed by just removing the modulus since it assumes you are not going to get collisions in your hash, but I guess are hashing the bucket number, and there are collisions in that hashing function, and that's creating the lumpiness.
It may have been exacerbated in the past when we might have previously used a different hashing function there, but we didn't have tests for this lumpiness.
this isn't an intrinsic operation of scalding or hadoop, so you can implement your own yourself.
The modulo iirc was due to the fact that spreading the key over the full range creates separate keys for ~most things to hadoop/scalding. which means some things like iterator operations, or mapValues can be a bit slower iirc?
Thanks for sharing @johnynek !
To be honest I don't quite understand why you need to add a.hashCode()
when computing .groupBy { a => rng.nextInt() + a.hashCode() }
in the last function you mentioned. Also, wouldn't you need to mod finally by buckets to get a grouping number between 0
and buckets - 1
? Something like this:
def groupRandomly[A](tp: TypedPipe[A], buckets: Int): Grouped[Int, A] = {
lazy val rng = new Random(123L)
tp
.groupBy { _ => rng.nextInt(Integer.MAX_VALUE) % buckets }
// Specifying Integer.MAX_VALUE returns only positive random numbers
.withReducers(buckets)
}
Thanks again for your time @johnynek , really appreciate it!
The motivation for the hashCode is so that two separate mappers wouldn't bucket in the same sequence: the randomness also depends on the input data, and each are different.
But you must seed or repeated runs will give different output.
I see, makes sense!
The modulo iirc was due to the fact that spreading the key over the full range creates separate keys for ~most things to hadoop/scalding. which means some things like iterator operations, or mapValues can be a bit slower iirc?
ahh yeah, that makes sense... we actually want the fewest possible groups to maximize the efficiency of the iterator based reduce on the other side...
In theory that makes sense, I wonder how often that is an issue...
In any case, the real fix, I think, is to avoid rehashing the bucket here and not treating it as an int. for instance, I maybe the real fix is to make a
case class Bucket(toInt: Int)
object Bucket {
implicit val ordSer: OrderedSerialization[Bucket] = ...
}
where the hashcode on Bucket and the ordered serialization just use the bucket id. I think using:
https://github.com/twitter/scalding/blob/b6d465f6fc308d66e5a53c62662d5860846d8dc4/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Hasher.scala#L74
is using murmur hashing on the int, which we don't actually want here.