spark
spark copied to clipboard
Utility function to get a setup & cleanup function for mapping each partition
Often when mapping some RDD, you want to do a bit of setup before processing each partition, followed by cleanup at the end of the partition; this adds utility functions to make that easier.
I felt that this is worth including because its a little tricky to get right -- I needed to add a "CleanupIterator", and I have an example in the unit tests of how this fails w/out it. OTOH, I wasn't sure if this necessarily belongs in the spark api itself (eg., do we add a version of foreach with this also?)
We find it a useful addition, and so thought others might also ...
I cleaned up the style issues, sorry about that. I left that "failing" test in there ... happy to remove it if you want me to, just wanted to be clear if you want me to document the trouble w/ mapPartitions somewhere
Hi Imran,
Have you seen the TaskContext and addOnCompleteCallback? That is what HadoopRDD uses to close the FileStream after all of the lines in a Hadoop file have been read.
You might be able to do what you're doing with just a custom RDD that did something like:
override def compute(s: Split, context: TaskContext): Iterator[(T, U)] = {
setupDbConnection()
context.addOnCompleteCallback { tearDownDbConnection()
// call parent rdd or do own compute stuff
I believe this will achieve the same thing, as compute will be called on each partition, and you'll have start/stop hooks around the execution on each partition.
I agree with Stephen here. The addOnCompleteCallback mechanism also makes sure to call your handler if the task throws an exception, which is important.
Also, can you add a similar method in the Java API? I guess you would need to create an interface in Java for the setup/close stuff too.
good point, I definitely hadn't thought about ensuring cleanup w/ exceptions.
I've updated it to use onCompleteCallback. I also added it to the java api -- I added separate classes for PairRDDs & DoubleRDDs, dunno if there is a better way to do that.
I wonder if this could be done with something more like decoration:
val rdd = sc.textFile(...).setupPartitions(dbSetupLogic).mapPartitions(...).cleanupPartitions(dbCloseLogic)
So there would be two new RDDs, PartitionSetupRDD that first invoked its setup function once/partition, then called firstParent.compute, and then PartitionCleanupRDD, that setup the complete callback for its cleanup function.
Not sure if the decoupling would lead to unintended/nonsensical use cases. But, just musing, then perhaps they could be used separately, if you only need one or the other, or without the map, which PartitionMapper currently forces you do to.
Also, I just like that this would use plain functions and not a new "PartitionMapper" interface--for some reason that doesn't feel quite right, but I can't think of a better name.
I see what you're trying to do though.
Regarding Stephen's comment -- I think it's better to keep PartitionMapper a single object instead of doing functions, in case you need to share state among the setup, map and clean methods (e.g. you open some external resource, use it in your map, then close it).
Ah, good point. That makes sense.
I updated JavaPairPartitionMapper, per Josh's suggestion. (We lose the ability for map to throw an exception, but that is already the case for the basic PartitionMapper.)
I tried doing the same thing for JavaDoubleRDD, but somehow I got stuck with weird manifest errors. First it complained:
[error] found : ClassManifest[scala.Double] [error] required: ClassManifest[java.lang.Double]
Then when I switched to explicitly using a java.lang.Double manifest, it reversed:
[error] found : spark.RDD[java.lang.Double] [error] required: spark.RDD[scala.Double]
so I just left it as is
OK I think this is ready to go now. I got rid of the need for the helper object for JavaDoubleRDD, by just casting from java.lang.Double to scala.Double, and it seems happy. Also I put a throws Exception declaration on map in PartitionMapper, for the java api.
Just curious what the status is on this -- waiting for some additional changes here, decided against merging it, or just haven't gotten to it yet.
Sorry, just hadn't had a chance to look at it. It looks good but I made two small comments.
thanks! I've updated to take those comments into account.
wow, somhow I totally missed committing changes to one file before ... hope you didn't waste time looking at it before, now its actually all there
Can one of the admins verify this patch?
I'm the Jenkins test bot for the UC, Berkeley AMPLab. I've noticed your pull request and will test it once an admin authorizes me to. Thanks for your submission!
I'm the Jenkins test bot for the UC, Berkeley AMPLab. I've noticed your pull request and will test it once an admin authorizes me to. Thanks for your submission!
Thank you for your pull request. An admin will review this request soon.