mars
mars copied to clipboard
[RFC] Make mapper output blocks consistent with num_reducers
Background
When building chunk graph, Mars chunks won't be added to graph if some chunks not used by downstream, because mars build graph back to first. This will make shuffle optimization tricky because some reducer may be lost. For example, we may want to build map-reducer dag dynamiclly based on reducer ordinal. After we get mapper shuffle block refs, we can pass it to coresponding reducer based on reducer ordinal. If some reducers are not contained, we won't be able to know which blocks should be passed which reducer.
Mars describe
is an exmaple. It use PSRS
algorithm, which has a shuffle stage named PSRSAlign
. Sometimes some reducer outputs for this shuffle will not be used, and the resulting chunk graph will has reducer chunks less than mapper blocks.
In the following exmaple, we have three PSRSAlign
mappers each produce three shuffle blocks, but we only got two reducers of three in the final chunk graph. We need a way to make mapper blocks consistent with num_reducers
def test_describe_execution(setup):
s_raw = pd.Series(np.random.rand(10))
# test multi chunks
series = from_pandas_series(s_raw, chunk_size=3)
r = series.describe(percentiles=[])
r.execute().fetch()
Solution1: Add whitelist to mapper operands
Since we don't know which reducers are lost, we can't add a blacklist to mappers which can be used to filter out output.
We can only pass a reducer index whitelist
to mapper operands, which will be used to filter shuffle blocks.
This solution will make every mapper record much meta, whcih will make supervisor the bottleneck for large-scale task.
Solution2: Add n_reducers
and reducer_ordinal
to MapReducerOperand
If we add n_reducers
and reducer_ordinal
when creating MapReducerOperand
, there will be not necessary to got it form chunk graph.
The problem for this is that we will need do it for every shuffle operand.
Solution 3
Add those ignored reducers to chunk graph. For most compute tasks, there will be no reducers ignored. So additional scheduling for these unnecessary subtasks is also acceptable