Mobius icon indicating copy to clipboard operation
Mobius copied to clipboard

repartitionAndSortWithinPartitions not doing repartition at all

Open alexdeng opened this issue 8 years ago • 3 comments

In the source code, I'm not seeing anywhere the partitionFunc was used to repartition the rdd. The current code merely does sorting within partitions.

///

/// Repartition the RDD according to the given partitioner and, within each resulting partition, /// sort records by their keys. /// /// This is more efficient than calling repartition and then sorting within each partition /// because it can push the sorting down into the shuffle machinery. /// /// /// /// /// /// /// /// public static RDD<Tuple<K, V>> repartitionAndSortWithinPartitions<K, V>( this RDD<Tuple<K, V>> self, int? numPartitions = null, Func<K, int> partitionFunc = null, bool ascending = true) { return self.MapPartitionsWithIndex<Tuple<K, V>>((pid, iter) => ascending ? iter.OrderBy(kv => kv.Item1) : iter.OrderByDescending(kv => kv.Item1)); }

pyspark function, which I assume this function is model upon, does do repartition correly. python code: def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash, ascending=True, keyfunc=lambda x: x): """ Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)]) >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2) >>> rdd2.glom().collect() [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]] """ if numPartitions is None: numPartitions = self._defaultReducePartitions()

    memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
    serializer = self._jrdd_deserializer

    def sortPartition(iterator):
        sort = ExternalSorter(memory * 0.9, serializer).sorted
        return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending)))

    return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)

alexdeng avatar Apr 05 '17 23:04 alexdeng

@alexdeng - thanks for reporting the issue @xiongrenyi - is this the issue you fixed this week?

skaarthik avatar Apr 09 '17 03:04 skaarthik

any update?

alexdeng avatar Apr 25 '17 04:04 alexdeng

@xiongrenyi can you comment?

skaarthik avatar Apr 27 '17 17:04 skaarthik