incubator-uniffle
incubator-uniffle copied to clipboard
[Improvement] Fine-grained shuffle manager fallback at the shuffle level
Motivation
Now uniffle's delegation shuffle manager support fallback to sort shuffle manager in app level. We could implement it in shuffle level.
Could you help check this feature? @jerqi
Spark has exchange reuse. It may be very complex for fine-grained shuffle manager fallback.
Sorry, maybe you misunderstand my thought. I hope if a spark job needs to do two shuffles, it can use the different shuffle manager.
- Shuffle-ID-0, use the sortShuffleManager due to the unhealthy server nodes by the access policy
- Shuffle-ID-1, use the rss due to the enough server nodes.
Does this feature effect the spark exchange reuse?
When we reuse the Shuffle-ID-1, the rss don't have enough server nodes, what will it happen?
When we reuse the Shuffle-ID-1, the rss don't have enough server nodes, what will it happen?
I think it should fail because the shuffle data is in died server nodes.
The exchange reuse you said is to share the written shuffle data for multiple downstream shuffle readers, right?
The exchange reuse you said is to share the written shuffle data for multiple downstream shuffle readers, right?
Yes, it also influence the dynamic allocation. SortShuffleManager need ESS to support dynamic allocation, but RssShuffleManager can't work with ESS.
I have a question that why it will influence the dynamic allocation? Could u help give more detail?
I think a shuffle dependency will hold a shuffle handle(rss or sort handle) which is decided by the method of registerShuffle. This means that a shuffle only will be managed by one shuffle manager(rss or sort) and served for writer or reader. The co-operation dont exist in sort and rss.
SortShuffleManager need ESS to support dynamic allocation, but RssShuffleManager can't work with ESS.
I dont understand what's the meaning.
Attach the code draft implementation.
Code Draft
public class DelegationRssShuffleManager implements ShuffleManager {
private ShuffleManager rssShuffleManager;
private ShuffleManager sortShuffleManager;
public DelegationRssShuffleManager(SparkConf sparkConf, boolean isDriver) throws Exception {
this.sparkConf = sparkConf;
// dont initialize the shuffle manager in the constructor.
}
private synchronized ShuffleManager createOrGetShuffleManager(boolean useRss) {
if (useRss) {
if (rssShuffleManager != null) {
return rssShuffleManager;
} else {
try {
final ShuffleManager shuffleManager = new RssShuffleManager(sparkConf, true);
sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
LOG.info("Use RssShuffleManager");
this.rssShuffleManager = shuffleManager;
return rssShuffleManager;
} catch (Exception exception) {
LOG.warn("Fail to create RssShuffleManager, fallback to SortShuffleManager {}", exception.getMessage());
}
}
}
if (sortShuffleManager == null) {
try {
final ShuffleManager shuffleManager =
RssSparkShuffleUtils.loadShuffleManager(Constants.SORT_SHUFFLE_MANAGER_NAME, sparkConf, true);
LOG.info("Use SortShuffleManager");
this.sortShuffleManager = shuffleManager;
} catch (Exception e) {
throw new RssException(e.getMessage());
}
}
return sortShuffleManager;
}
@Override
public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<K, V, C> dependency) {
ShuffleManager shuffleManager = createOrGetShuffleManager(tryAccessCluster());
if (!(shuffleManager instanceof RssShuffleManager)) {
shuffleIdsUsingSort.add(shuffleId);
}
return shuffleManager.registerShuffle(shuffleId, dependency);
}
@Override
public <K, V> ShuffleWriter<K, V> getWriter(
ShuffleHandle handle,
long mapId,
TaskContext context,
ShuffleWriteMetricsReporter metrics) {
ShuffleManager shuffleManager = createOrGetShuffleManager(handle instanceof RssShuffleHandle);
return shuffleManager.getWriter(handle, mapId, context, metrics);
}
@Override
public <K, C> ShuffleReader<K, C> getReader(
ShuffleHandle handle,
int startPartition,
int endPartition,
TaskContext context,
ShuffleReadMetricsReporter metrics) {
ShuffleManager shuffleManager = createOrGetShuffleManager(handle instanceof RssShuffleHandle);
return shuffleManager.getReader(handle,
startPartition, endPartition, context, metrics);
}
}
You can see https://github.com/apache/incubator-uniffle/blob/master/spark-patches/spark-2.4.6_dynamic_allocation_support.patch