scalding icon indicating copy to clipboard operation
scalding copied to clipboard

group randomly is not balanced

Open ianoc-stripe opened this issue 8 years ago • 15 comments

Group randomly uses nextInt(numReducers), this means we get huge reducer skew

ianoc-stripe avatar Feb 13 '17 18:02 ianoc-stripe

@ianoc-stripe Not sure I totally understand. Are you saying we should just use nextInt() to distribute it evenly over the Integer space?

benpence avatar Feb 13 '17 18:02 benpence

Exactly! -- otherwise we are hashing the reduced integer space and then overload a few keys

ianoc-stripe avatar Feb 13 '17 19:02 ianoc-stripe

I think this is a birthday problem issue. I think we want at least something like numReducers * numReducers or something.

johnynek avatar Feb 13 '17 19:02 johnynek

Is there any downside to the full integer space?

On Mon, Feb 13, 2017 at 11:43 AM, P. Oscar Boykin [email protected] wrote:

I think this is a birthday problem issue. I think we want at least something like numReducers * numReducers or something.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/twitter/scalding/issues/1639#issuecomment-279500272, or mute the thread https://github.com/notifications/unsubscribe-auth/AWCkRgNY7vTdSDfp4fV5oujuBzKx0MNUks5rcLJpgaJpZM4L_jVZ .

ianoc-stripe avatar Feb 13 '17 19:02 ianoc-stripe

well, in the groups, they will have fewer entries. Where, ideally, we could apply a mapValueStream to a bigger block of them for some use cases.

I think the ideal would be 1 group per reducer, but doing this randomly creates skew.

johnynek avatar Feb 13 '17 20:02 johnynek

Fair, not sure how the distribution over reducers would look at n^2 or how common that use case is. If n^2 covers it might aswell use that for the bound

ianoc-stripe avatar Feb 13 '17 20:02 ianoc-stripe

yeah, I'm not too uptight about it, but having fewer groups seems like it could be useful. Actually, I can't replicate the skew in a simple repl session:

scala> def bucket(n: Int, k: Int, recs: Int, seed: Int): Vector[Int] = {
     |   val rng = new java.util.Random(seed)
     |   val m = (0 until recs).iterator.map { _ => rng.nextInt(k) }.toList.groupBy(_ % n).mapValues(_.size)
     |   (0 until n).map(m(_)).toVector
     | }

scala> def spread(vs: Vector[Int]) = vs.max - vs.min
spread: (vs: Vector[Int])Int

scala> spread(bucket(10, 10, 1000, 123))
res1: Int = 47

scala> spread(bucket(10, 10, 10000, 123))
res2: Int = 127

scala> spread(bucket(10, 100, 10000, 123))
res3: Int = 127

did I make some silly error?

johnynek avatar Feb 14 '17 01:02 johnynek

Yep -- well your code relies on the fact that integer hash codes on the jvm are the identity.

def bucket(n: Int, k: Int, recs: Int, seed: Int): Vector[Int] = {
       val rng = new java.util.Random(seed)
        val m = (0 until recs).iterator.map { _ => val l = rng.nextInt(k); (l, l.toString) }.toList.groupBy(_.hashCode % n).mapValues(_.size)
        (0 until n).map(s => m.getOrElse(s, 0)).toVector
      }

scala> spread(bucket(10, 10, 10000, 123))
res3: Int = 2000

scala> spread(bucket(10, 10, 1000, 123))
res4: Int = 198

ianoc-stripe avatar Feb 14 '17 02:02 ianoc-stripe

@ianoc @johanoskarsson @benpence

Been struggling with the same issue as well! Notice that some reducers get much more data than others (some reducers don't get any data at all!). Wondering if you guys have a better solution than TypedPipe.groupRandomly()?

In my case, I have to make around 50K partitions using groupRandomly to a very large TypedPipe.

Thanks in advance for your time!

noveens avatar Jul 22 '21 22:07 noveens

if you look at groupRandomly(buckets) it is basically the same as:

def groupRandomly[A](tp: TypedPipe[A], buckets: Int): Grouped[Int, A] = {
  lazy val rng = new Random(123L)
  tp.groupBy { a =>
    val raw = rng.nextInt(buckets) + a.hashCode()
    val mod = raw % buckets
    if (mod >= 0) mod else mod + buckets
  }
  .withReducers(buckets)
}

so you could change it to anything you want, for instance, you can try this:

def groupSequentially[A](tp: TypedPipe[A], buckets: Int): Grouped[Int, A] = {
  var idx = 0
  tp.groupBy { _ =>
    val mod = idx % buckets
    idx += 1
    if (mod >= 0) mod else mod + buckets
  }
  .withReducers(buckets)
}

which would do a round robin style from one to the next.

You could also just drop the modulus in the original example:

def groupRandomly[A](tp: TypedPipe[A], buckets: Int): Grouped[Int, A] = {
  lazy val rng = new Random(123L)
  tp
    .groupBy { a => rng.nextInt() + a.hashCode() }
    .withReducers(buckets)
}

Which I think is probably fine... I'm not really sure the motivation for the modulus, but we have been reluctant to make a change...

I think the bug here probably should be fixed by just removing the modulus since it assumes you are not going to get collisions in your hash, but I guess are hashing the bucket number, and there are collisions in that hashing function, and that's creating the lumpiness.

It may have been exacerbated in the past when we might have previously used a different hashing function there, but we didn't have tests for this lumpiness.

this isn't an intrinsic operation of scalding or hadoop, so you can implement your own yourself.

johnynek avatar Jul 23 '21 01:07 johnynek

The modulo iirc was due to the fact that spreading the key over the full range creates separate keys for ~most things to hadoop/scalding. which means some things like iterator operations, or mapValues can be a bit slower iirc?

ianoc avatar Jul 23 '21 02:07 ianoc

Thanks for sharing @johnynek !

To be honest I don't quite understand why you need to add a.hashCode() when computing .groupBy { a => rng.nextInt() + a.hashCode() } in the last function you mentioned. Also, wouldn't you need to mod finally by buckets to get a grouping number between 0 and buckets - 1? Something like this:

def groupRandomly[A](tp: TypedPipe[A], buckets: Int): Grouped[Int, A] = {
  lazy val rng = new Random(123L)
  tp
    .groupBy { _ => rng.nextInt(Integer.MAX_VALUE) % buckets } 
    // Specifying Integer.MAX_VALUE returns only positive random numbers
    .withReducers(buckets)
}

Thanks again for your time @johnynek , really appreciate it!

noveens avatar Jul 23 '21 03:07 noveens

The motivation for the hashCode is so that two separate mappers wouldn't bucket in the same sequence: the randomness also depends on the input data, and each are different.

But you must seed or repeated runs will give different output.

johnynek avatar Jul 23 '21 03:07 johnynek

I see, makes sense!

noveens avatar Jul 23 '21 03:07 noveens

The modulo iirc was due to the fact that spreading the key over the full range creates separate keys for ~most things to hadoop/scalding. which means some things like iterator operations, or mapValues can be a bit slower iirc?

ahh yeah, that makes sense... we actually want the fewest possible groups to maximize the efficiency of the iterator based reduce on the other side...

In theory that makes sense, I wonder how often that is an issue...

In any case, the real fix, I think, is to avoid rehashing the bucket here and not treating it as an int. for instance, I maybe the real fix is to make a

case class Bucket(toInt: Int) 
object Bucket {
  implicit val ordSer: OrderedSerialization[Bucket] = ...
}

where the hashcode on Bucket and the ordered serialization just use the bucket id. I think using:

https://github.com/twitter/scalding/blob/b6d465f6fc308d66e5a53c62662d5860846d8dc4/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Hasher.scala#L74

is using murmur hashing on the int, which we don't actually want here.

johnynek avatar Jul 26 '21 19:07 johnynek