scalding
scalding copied to clipboard
NullSink can't be used more than once in a flow
we should 1) not have an object NullSink
since each sink needs to be distict, 2) NullTap needs to have different identifiers:
https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Source.scala#L285
We've been using this for quite a long time internally at Tapad:
package com.tapad.scalding
import java.io._
import java.util.Properties
import cascading.tap.Tap
import com.twitter.scalding._
import org.apache.hadoop.mapred.{JobConf, OutputCollector, RecordReader}
import scala.util.Random
/**
* If you need to write multiple pipes to NullSource in a single job, use this instead!
*
* import com.twitter.scalding._
*
* class OddsAndEvens(args: Args) extends Job(args) {
*
* val input = IterableSource(Seq(0, 1, 2, 3), 'num)
*
* val odds = input.filter('num)((_ % 2 != 0): Int => Boolean)
*
* val evens = input.filter('num)((_ % 2 == 0): Int => Boolean)
*
* odds.debug.write(new ReusableNullSource)
*
* evens.debug.write(new ReusableNullSource)
* }
*
*/
class ReusableNullSource extends Source {
private def nullTap[A, B, C, D, E] = new NullTap[A, B, C, D, E] {
override val getIdentifier = Random.alphanumeric.take(32).mkString
}
override def createTap(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_, _, _] = {
readOrWrite match {
case Read => throw new Exception("not supported, reading from null")
case Write => mode match {
case Hdfs(_, _) => nullTap[JobConf, RecordReader[_, _], OutputCollector[_, _], Any, Any]
case Local(_) => nullTap[Properties, InputStream, OutputStream, Any, Any]
case Test(_) => nullTap[Properties, InputStream, OutputStream, Any, Any]
}
}
}
}
Actually just came to check if this was finally resolved upstream and saw this issue was opened just a few days ago :)
yeah, I had to make something similar. I didn't know about the issue before since I had not noticed this debug pattern.