spark icon indicating copy to clipboard operation
spark copied to clipboard

IndexedRDD for accelerated joins

Open jegonzal opened this issue 12 years ago • 8 comments
trafficstars

In many applications (especially graph computation and machine learning) we are iteratively joining model parameters (vertices) with data (edges). In these cases it can be beneficial to pre-organize the records within each partition to share a similar structure.

Logically the IndexedRDD[K,V] extends the RDD[(K,V)] and provides the same functionality. An IndexedRDD is constructed by using the PairRDDFunctions.indexed method:

val tbl: IndexedRDD[String, Int] = sc.parallelize(Array( ("1",1), ("1", -1), ("3",3), ("5",5) )).indexed() 

The tbl.index can be then applied to other RDDs that share the same key set:

val tbl: IndexedRDD[String, Int] = sc.parallelize(Array( ("1",1), ("1", -1), ("3",3), ("5",5) )).indexed()
val tbl2: IndexedRDD[String, Float] = sc.parallelize(Array( ("1",1.0), ("3",3.0), ("5",5.0) )).indexed(tbl.index)  

By sharing the same index across multiple RDDs, we guarantee that the values within each RDD is organized identically enabling fast join operations. Furthermore, because the index is prematerialized (and cached) we can use it to accelerate join operations with RDD[(K,V)] that have not been indexed.

jegonzal avatar Aug 19 '13 20:08 jegonzal

Thank you for your pull request. An admin will review this request soon.

AmplabJenkins avatar Aug 19 '13 20:08 AmplabJenkins

This is still a prototype but I wanted to open the discussion on the design so I can incorporate feedback early.

jegonzal avatar Aug 19 '13 20:08 jegonzal

I have not looked into this in great detail, but shouldn't cogroup, etc (anything using multiple rdd's) validate the partitioners are equal before comparing partition index, etc ? Or is the github mobile site messing with me !

Regards Mridul On Aug 20, 2013 1:50 AM, "Joey" [email protected] wrote:

In many applications (especially graph computation and machine learning) we are iteratively joining model parameters (vertices) with data (edges). In these cases it can be beneficial to pre-organize the records within each partition to share a similar structure.

Logically the IndexedRDD[K,V] extends the RDD[(K,V)] and provides the same functionality. An IndexedRDD is constructed by using the PairRDDFunctions.indexed method:

val tbl: IndexedRDD[String, Int] = sc.parallelize(Array( ("1",1), ("1", -1), ("3",3), ("5",5) )).indexed()

The tbl.index can be then applied to other RDDs that share the same key set:

val tbl: IndexedRDD[String, Int] = sc.parallelize(Array( ("1",1), ("1", -1), ("3",3), ("5",5) )).indexed()val tbl2: IndexedRDD[String, Float] = sc.parallelize(Array( ("1",1.0), ("3",3.0), ("5",5.0) )).indexed(tbl.index)

By sharing the same index across multiple RDDs, we guarantee that the values within each RDD is organized identically enabling fast join operations. Furthermore, because the index is prematerialized (and cached) we can use it to accelerate join operations with RDD[(K,V)] that have not

been indexed.

You can merge this Pull Request by running

git pull https://github.com/jegonzal/spark indexed_rdd

Or view, comment on, or merge it at:

https://github.com/mesos/spark/pull/848 Commit Summary

  • second indexedrdd design
  • Finished early prototype of IndexedRDD
  • Adding testing code for indexedrdd
  • IndexedRDD passes all PairRDD Function tests
  • adding better error handling when indexing an RDD
  • changing caching behavior on indexedrdds
  • Merge branch 'master' of https://github.com/mesos/spark into indexed_rdd2
  • Merged with changes to zipPartitions
  • Corrected all indexed RDD tests.
  • Merge branch 'master' of https://github.com/mesos/spark into indexed_rdd

File Changes

  • M core/src/main/scala/spark/PairRDDFunctions.scalahttps://github.com/mesos/spark/pull/848/files#diff-0(11)
  • M core/src/main/scala/spark/RDD.scalahttps://github.com/mesos/spark/pull/848/files#diff-1(9)
  • A core/src/main/scala/spark/rdd/IndexedRDD.scalahttps://github.com/mesos/spark/pull/848/files#diff-2(548)
  • A core/src/test/scala/spark/IndexedRDDSuite.scalahttps://github.com/mesos/spark/pull/848/files#diff-3(460)

Patch Links:

  • https://github.com/mesos/spark/pull/848.patch
  • https://github.com/mesos/spark/pull/848.diff

mridulm avatar Aug 20 '13 10:08 mridulm

I have made some organizational changes based on suggestions from @rxin and @mateiz.

To RDD I added:

  def pairRDDFunctions[K, V](
      implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]): 
      PairRDDFunctions[K, V] = {
    new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]])
  }

This interesting piece of code returns a PairRDDFunctions wrapper for this RDD. By delegating the construction of the pairRDDFunctions object to the RDD class, specializations of RDD can return different implementations of the PairRDDFunctions. For example, the new IndexedRDD class overrides this function to return:

override def pairRDDFunctions[K1, V1](
      implicit t: (K, V) <:< (K1,V1), k: ClassManifest[K1], v: ClassManifest[V1]): 
    PairRDDFunctions[K1, V1] = {
    new IndexedRDDFunctions[K1,V1](this.asInstanceOf[IndexedRDD[K1,V1]])
  }

I updated the implicit construction of PairRDDFunctions to call rdd.pairRDDFunctions

I modified PairRDDFunctions to provide a ClassManifest for intermediate types. This is needed for some PairRDD implementations that are array backed (i.e., IndexedRDD).

jegonzal avatar Aug 28 '13 01:08 jegonzal

In response to @mridulm, the IndexedRDD exploits the partitioning of the index. If both RDDs are indexed (IndexedRDDs) and have the same index then both must be partitioned identically and even arranged identically within each partition enabling cogroup to be achieve using zip. If both RDDs are indexed with a different index then the standard shuffle logic is applied though the index is used to help pre-construct hash maps.

jegonzal avatar Aug 28 '13 02:08 jegonzal

To give an example of what I was referring to - take a look at cogroup in IndexedRDDFunctions,

self and other IndexedRDDFunctions need not be partitioned by the same partitioner - and so direct index (value) comparisons in the case match can result in incorrectness ?

Unfortunately, I do not have time to go over this PR - so will derfer to someone else to comment on this further.

mridulm avatar Aug 28 '13 03:08 mridulm

Hey Joey, this looks nice. Do you happen to have some microbenchmarks for the performance gains of indexed versus non-indexed cogroup? Also you you mentioned the applications to graph algorithms. What other operations on RDD's do you think could benefit from having an index from this context?

AndreSchumacher avatar Aug 28 '13 21:08 AndreSchumacher

Unfortunately, I don't have any good benchmarks to evaluate joins so I am in the process of creating some. Does anyone have suggestions?

jegonzal avatar Sep 18 '13 05:09 jegonzal