spark icon indicating copy to clipboard operation
spark copied to clipboard

Utility function to get a setup & cleanup function for mapping each partition

Open squito opened this issue 12 years ago • 17 comments

Often when mapping some RDD, you want to do a bit of setup before processing each partition, followed by cleanup at the end of the partition; this adds utility functions to make that easier.

I felt that this is worth including because its a little tricky to get right -- I needed to add a "CleanupIterator", and I have an example in the unit tests of how this fails w/out it. OTOH, I wasn't sure if this necessarily belongs in the spark api itself (eg., do we add a version of foreach with this also?)

We find it a useful addition, and so thought others might also ...

squito avatar Feb 08 '13 21:02 squito

I cleaned up the style issues, sorry about that. I left that "failing" test in there ... happy to remove it if you want me to, just wanted to be clear if you want me to document the trouble w/ mapPartitions somewhere

squito avatar Feb 08 '13 22:02 squito

Hi Imran,

Have you seen the TaskContext and addOnCompleteCallback? That is what HadoopRDD uses to close the FileStream after all of the lines in a Hadoop file have been read.

You might be able to do what you're doing with just a custom RDD that did something like:

override def compute(s: Split, context: TaskContext): Iterator[(T, U)] = {
   setupDbConnection()
   context.addOnCompleteCallback { tearDownDbConnection()
   // call parent rdd or do own compute stuff

I believe this will achieve the same thing, as compute will be called on each partition, and you'll have start/stop hooks around the execution on each partition.

stephenh avatar Feb 09 '13 16:02 stephenh

I agree with Stephen here. The addOnCompleteCallback mechanism also makes sure to call your handler if the task throws an exception, which is important.

Also, can you add a similar method in the Java API? I guess you would need to create an interface in Java for the setup/close stuff too.

mateiz avatar Feb 09 '13 18:02 mateiz

good point, I definitely hadn't thought about ensuring cleanup w/ exceptions.

I've updated it to use onCompleteCallback. I also added it to the java api -- I added separate classes for PairRDDs & DoubleRDDs, dunno if there is a better way to do that.

squito avatar Feb 11 '13 05:02 squito

I wonder if this could be done with something more like decoration:

val rdd = sc.textFile(...).setupPartitions(dbSetupLogic).mapPartitions(...).cleanupPartitions(dbCloseLogic)

So there would be two new RDDs, PartitionSetupRDD that first invoked its setup function once/partition, then called firstParent.compute, and then PartitionCleanupRDD, that setup the complete callback for its cleanup function.

Not sure if the decoupling would lead to unintended/nonsensical use cases. But, just musing, then perhaps they could be used separately, if you only need one or the other, or without the map, which PartitionMapper currently forces you do to.

Also, I just like that this would use plain functions and not a new "PartitionMapper" interface--for some reason that doesn't feel quite right, but I can't think of a better name.

I see what you're trying to do though.

stephenh avatar Feb 11 '13 05:02 stephenh

Regarding Stephen's comment -- I think it's better to keep PartitionMapper a single object instead of doing functions, in case you need to share state among the setup, map and clean methods (e.g. you open some external resource, use it in your map, then close it).

mateiz avatar Feb 11 '13 06:02 mateiz

Ah, good point. That makes sense.

stephenh avatar Feb 11 '13 08:02 stephenh

I updated JavaPairPartitionMapper, per Josh's suggestion. (We lose the ability for map to throw an exception, but that is already the case for the basic PartitionMapper.)

I tried doing the same thing for JavaDoubleRDD, but somehow I got stuck with weird manifest errors. First it complained:

[error] found : ClassManifest[scala.Double] [error] required: ClassManifest[java.lang.Double]

Then when I switched to explicitly using a java.lang.Double manifest, it reversed:

[error] found : spark.RDD[java.lang.Double] [error] required: spark.RDD[scala.Double]

so I just left it as is

squito avatar Feb 12 '13 16:02 squito

OK I think this is ready to go now. I got rid of the need for the helper object for JavaDoubleRDD, by just casting from java.lang.Double to scala.Double, and it seems happy. Also I put a throws Exception declaration on map in PartitionMapper, for the java api.

squito avatar Feb 13 '13 06:02 squito

Just curious what the status is on this -- waiting for some additional changes here, decided against merging it, or just haven't gotten to it yet.

squito avatar Mar 16 '13 17:03 squito

Sorry, just hadn't had a chance to look at it. It looks good but I made two small comments.

mateiz avatar Mar 16 '13 18:03 mateiz

thanks! I've updated to take those comments into account.

squito avatar Mar 16 '13 21:03 squito

wow, somhow I totally missed committing changes to one file before ... hope you didn't waste time looking at it before, now its actually all there

squito avatar Mar 20 '13 06:03 squito

Can one of the admins verify this patch?

AmplabJenkins avatar Apr 04 '13 21:04 AmplabJenkins

I'm the Jenkins test bot for the UC, Berkeley AMPLab. I've noticed your pull request and will test it once an admin authorizes me to. Thanks for your submission!

AmplabJenkins avatar Apr 10 '13 20:04 AmplabJenkins

I'm the Jenkins test bot for the UC, Berkeley AMPLab. I've noticed your pull request and will test it once an admin authorizes me to. Thanks for your submission!

AmplabJenkins avatar Apr 18 '13 22:04 AmplabJenkins

Thank you for your pull request. An admin will review this request soon.

AmplabJenkins avatar Aug 05 '13 21:08 AmplabJenkins