incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

[Improvement] Fine-grained shuffle manager fallback at the shuffle level

Open zuston opened this issue 3 years ago • 8 comments

Motivation

Now uniffle's delegation shuffle manager support fallback to sort shuffle manager in app level. We could implement it in shuffle level.

zuston avatar Aug 08 '22 09:08 zuston

Could you help check this feature? @jerqi

zuston avatar Aug 09 '22 02:08 zuston

Spark has exchange reuse. It may be very complex for fine-grained shuffle manager fallback.

jerqi avatar Aug 09 '22 02:08 jerqi

Sorry, maybe you misunderstand my thought. I hope if a spark job needs to do two shuffles, it can use the different shuffle manager.

  1. Shuffle-ID-0, use the sortShuffleManager due to the unhealthy server nodes by the access policy
  2. Shuffle-ID-1, use the rss due to the enough server nodes.

Does this feature effect the spark exchange reuse?

zuston avatar Aug 09 '22 02:08 zuston

When we reuse the Shuffle-ID-1, the rss don't have enough server nodes, what will it happen?

jerqi avatar Aug 09 '22 02:08 jerqi

When we reuse the Shuffle-ID-1, the rss don't have enough server nodes, what will it happen?

I think it should fail because the shuffle data is in died server nodes.

The exchange reuse you said is to share the written shuffle data for multiple downstream shuffle readers, right?

zuston avatar Aug 09 '22 03:08 zuston

The exchange reuse you said is to share the written shuffle data for multiple downstream shuffle readers, right?

Yes, it also influence the dynamic allocation. SortShuffleManager need ESS to support dynamic allocation, but RssShuffleManager can't work with ESS.

jerqi avatar Aug 09 '22 03:08 jerqi

I have a question that why it will influence the dynamic allocation? Could u help give more detail?

I think a shuffle dependency will hold a shuffle handle(rss or sort handle) which is decided by the method of registerShuffle. This means that a shuffle only will be managed by one shuffle manager(rss or sort) and served for writer or reader. The co-operation dont exist in sort and rss.

SortShuffleManager need ESS to support dynamic allocation, but RssShuffleManager can't work with ESS.

I dont understand what's the meaning.

Attach the code draft implementation.

Code Draft

public class DelegationRssShuffleManager implements ShuffleManager {
 
  private ShuffleManager rssShuffleManager;
  private ShuffleManager sortShuffleManager;

   public DelegationRssShuffleManager(SparkConf sparkConf, boolean isDriver) throws Exception {
    this.sparkConf = sparkConf;
    // dont initialize the shuffle manager in the constructor.
  }

 private synchronized ShuffleManager createOrGetShuffleManager(boolean useRss) {
   if (useRss) {
      if (rssShuffleManager != null) {
        return rssShuffleManager;
      } else {
        try {
          final ShuffleManager shuffleManager = new RssShuffleManager(sparkConf, true);
          sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
          LOG.info("Use RssShuffleManager");
          this.rssShuffleManager = shuffleManager;
          return rssShuffleManager;
        } catch (Exception exception) {
          LOG.warn("Fail to create RssShuffleManager, fallback to SortShuffleManager {}", exception.getMessage());
        }
      }
    }

    if (sortShuffleManager == null) {
      try {
        final ShuffleManager shuffleManager =
            RssSparkShuffleUtils.loadShuffleManager(Constants.SORT_SHUFFLE_MANAGER_NAME, sparkConf, true);
        LOG.info("Use SortShuffleManager");
        this.sortShuffleManager = shuffleManager;
      } catch (Exception e) {
        throw new RssException(e.getMessage());
      }
    }
    return sortShuffleManager;
 }
  
  @Override
  public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<K, V, C> dependency) {
    ShuffleManager shuffleManager = createOrGetShuffleManager(tryAccessCluster());
    if (!(shuffleManager instanceof RssShuffleManager)) {
      shuffleIdsUsingSort.add(shuffleId);
    }
    return shuffleManager.registerShuffle(shuffleId, dependency);
  }
  
  @Override
  public <K, V> ShuffleWriter<K, V> getWriter(
      ShuffleHandle handle,
      long mapId,
      TaskContext context,
      ShuffleWriteMetricsReporter metrics) {
    ShuffleManager shuffleManager = createOrGetShuffleManager(handle instanceof RssShuffleHandle);
    return shuffleManager.getWriter(handle, mapId, context, metrics);
  }

   @Override
  public <K, C> ShuffleReader<K, C> getReader(
      ShuffleHandle handle,
      int startPartition,
      int endPartition,
      TaskContext context,
      ShuffleReadMetricsReporter metrics) {
    ShuffleManager shuffleManager = createOrGetShuffleManager(handle instanceof RssShuffleHandle);
    return shuffleManager.getReader(handle,
        startPartition, endPartition, context, metrics);
  }
}

zuston avatar Aug 09 '22 06:08 zuston

You can see https://github.com/apache/incubator-uniffle/blob/master/spark-patches/spark-2.4.6_dynamic_allocation_support.patch

jerqi avatar Aug 09 '22 06:08 jerqi