incubator-uniffle
incubator-uniffle copied to clipboard
[#2212] feat(client,server,coordinator): Introduce ServiceVersion framework to support backward compatible and reduce BlockIdLayout message
What changes were proposed in this pull request?
- Introduce ServiceVersion framework to support backward compatible
- Reduce BlcokIdLayout message by store it into server through registerShuffle and get it for each
getShuffleResultandgetShuffleResultForMultiPart.
Why are the changes needed?
Fix: #2212
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Test locally
- New client -> New Coordinator/Server : use blockIdLayout from ShuffleTaskInfo stored by registerShuffleRequest
[2024-10-22 16:42:18.685] cmd=registerShuffle statusCode=SUCCESS from=/localhost:55338 executionTimeUs=47161 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{remoteStoragePath=, user=root, stageAttemptNumber=0, blockIdLayout=null}
[2024-10-22 16:42:20.555] cmd=requireBuffer statusCode=SUCCESS from=/localhost:55352 executionTimeUs=9604 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{requireSize=251, partitionIdsSize=1} return{requireBufferId=1}
[2024-10-22 16:42:20.555] cmd=requireBuffer statusCode=SUCCESS from=/localhost:55352 executionTimeUs=10581 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{requireSize=457, partitionIdsSize=2} return{requireBufferId=2}
[2024-10-22 16:42:20.680] cmd=sendShuffleData statusCode=SUCCESS from=/localhost:55136 executionTimeUs=56345 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{requireBufferId=1, requireSize=251, isPreAllocated=true, requireBlocksSize=48, stageAttemptNumber=0, partitionCount=1}
[2024-10-22 16:42:20.683] cmd=sendShuffleData statusCode=SUCCESS from=/localhost:55136 executionTimeUs=607 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{requireBufferId=2, requireSize=457, isPreAllocated=true, requireBlocksSize=110, stageAttemptNumber=0, partitionCount=2}
[2024-10-22 16:42:20.719] cmd=commitShuffleTask statusCode=SUCCESS from=/localhost:55352 executionTimeUs=6617 appId=app-20241022164214-0077_1729586532865 shuffleId=0 return{commitCount=2}
[2024-10-22 16:42:20.719] cmd=commitShuffleTask statusCode=SUCCESS from=/localhost:55352 executionTimeUs=6456 appId=app-20241022164214-0077_1729586532865 shuffleId=0 return{commitCount=1}
[2024-10-22 16:42:20.948] cmd=reportShuffleResult statusCode=SUCCESS from=/localhost:55352 executionTimeUs=16630 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{taskAttemptId=0, bitmapNum=1, partitionToBlockIdsSize=2}
[2024-10-22 16:42:21.771] cmd=finishShuffle statusCode=SUCCESS from=/localhost:55352 executionTimeUs=1028454 appId=app-20241022164214-0077_1729586532865 shuffleId=0
[2024-10-22 16:42:22.114] cmd=reportShuffleResult statusCode=SUCCESS from=/localhost:55352 executionTimeUs=2799 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{taskAttemptId=4, bitmapNum=1, partitionToBlockIdsSize=1}
[2024-10-22 16:42:22.513] cmd=getShuffleResultForMultiPart statusCode=SUCCESS from=/localhost:55352 executionTimeUs=27809 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{partitionsListSize=1, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 16:42:22.513] cmd=getShuffleResultForMultiPart statusCode=SUCCESS from=/localhost:55352 executionTimeUs=27843 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{partitionsListSize=1, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 16:42:22.627] cmd=getLocalShuffleIndex statusCode=SUCCESS from=/localhost:55138 executionTimeUs=23477 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{requestId=2, partitionId=0, partitionNumPerRange=1, partitionNum=2} return{len=40}
[2024-10-22 16:42:22.628] cmd=getLocalShuffleIndex statusCode=SUCCESS from=/localhost:55138 executionTimeUs=706 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{requestId=3, partitionId=1, partitionNumPerRange=1, partitionNum=2} return{len=80}
[2024-10-22 16:42:22.652] cmd=getLocalShuffleData statusCode=SUCCESS from=/localhost:55136 executionTimeUs=3129 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{requestId=4, partitionId=0, partitionNumPerRange=1, partitionNum=2, offset=0, length=13} return{len=13}
[2024-10-22 16:42:22.655] cmd=getLocalShuffleData statusCode=SUCCESS from=/localhost:55136 executionTimeUs=975 appId=app-20241022164214-0077_1729586532865 shuffleId=0 args{requestId=5, partitionId=1, partitionNumPerRange=1, partitionNum=2, offset=0, length=49} return{len=49}
- Old client -> New Coordinator/Server : use blockIdLayout by getShuffleReportRequest, nothing change.
[2024-10-22 16:55:47.711] cmd=registerShuffle statusCode=SUCCESS from=/localhost:38382 executionTimeUs=5681 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{remoteStoragePath=, user=root, stageAttemptNumber=0, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 16:55:49.571] cmd=requireBuffer statusCode=SUCCESS from=/localhost:38386 executionTimeUs=4007 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{requireSize=251, partitionIdsSize=1} return{requireBufferId=4}
[2024-10-22 16:55:49.572] cmd=requireBuffer statusCode=SUCCESS from=/localhost:38386 executionTimeUs=4550 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{requireSize=457, partitionIdsSize=2} return{requireBufferId=3}
[2024-10-22 16:55:49.651] cmd=sendShuffleData statusCode=SUCCESS from=/localhost:34940 executionTimeUs=11168 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{requireBufferId=3, requireSize=457, isPreAllocated=true, requireBlocksSize=110, stageAttemptNumber=0, partitionCount=2}
[2024-10-22 16:55:49.651] cmd=sendShuffleData statusCode=SUCCESS from=/localhost:34926 executionTimeUs=10189 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{requireBufferId=4, requireSize=251, isPreAllocated=true, requireBlocksSize=48, stageAttemptNumber=0, partitionCount=1}
[2024-10-22 16:55:49.707] cmd=commitShuffleTask statusCode=SUCCESS from=/localhost:38386 executionTimeUs=5040 appId=app-20241022165543-0078_1729587342008 shuffleId=0 return{commitCount=2}
[2024-10-22 16:55:49.707] cmd=commitShuffleTask statusCode=SUCCESS from=/localhost:38386 executionTimeUs=5149 appId=app-20241022165543-0078_1729587342008 shuffleId=0 return{commitCount=1}
[2024-10-22 16:55:49.890] cmd=reportShuffleResult statusCode=SUCCESS from=/localhost:38386 executionTimeUs=2541 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{taskAttemptId=0, bitmapNum=1, partitionToBlockIdsSize=2}
[2024-10-22 16:55:50.742] cmd=finishShuffle statusCode=SUCCESS from=/localhost:38386 executionTimeUs=1006461 appId=app-20241022165543-0078_1729587342008 shuffleId=0
[2024-10-22 16:55:51.080] cmd=reportShuffleResult statusCode=SUCCESS from=/localhost:38386 executionTimeUs=1851 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{taskAttemptId=4, bitmapNum=1, partitionToBlockIdsSize=1}
[2024-10-22 16:55:51.497] cmd=getShuffleResultForMultiPart statusCode=SUCCESS from=/localhost:38386 executionTimeUs=3230 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{partitionsListSize=1, blockIdLayout=null}
[2024-10-22 16:55:51.497] cmd=getShuffleResultForMultiPart statusCode=SUCCESS from=/localhost:38386 executionTimeUs=7935 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{partitionsListSize=1, blockIdLayout=null}
[2024-10-22 16:55:51.578] cmd=getLocalShuffleIndex statusCode=SUCCESS from=/localhost:34926 executionTimeUs=4631 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{requestId=2, partitionId=0, partitionNumPerRange=1, partitionNum=2} return{len=40}
[2024-10-22 16:55:51.578] cmd=getLocalShuffleIndex statusCode=SUCCESS from=/localhost:34940 executionTimeUs=5751 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{requestId=3, partitionId=1, partitionNumPerRange=1, partitionNum=2} return{len=80}
[2024-10-22 16:55:51.604] cmd=getLocalShuffleData statusCode=SUCCESS from=/localhost:34926 executionTimeUs=1735 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{requestId=5, partitionId=1, partitionNumPerRange=1, partitionNum=2, offset=0, length=49} return{len=49}
[2024-10-22 16:55:51.604] cmd=getLocalShuffleData statusCode=SUCCESS from=/localhost:34940 executionTimeUs=1735 appId=app-20241022165543-0078_1729587342008 shuffleId=0 args{requestId=4, partitionId=0, partitionNumPerRange=1, partitionNum=2, offset=0, length=13} return{len=13}
- New client -> Old Coordinator/New Server : use blockIdLayout by getShuffleReportRequest, nothing change.
[2024-10-22 18:53:38.191] cmd=registerShuffle statusCode=SUCCESS from=/localhost:54010 executionTimeUs=5038832 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{remoteStoragePath=, user=root, stageAttemptNumber=0, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 18:53:40.106] cmd=requireBuffer statusCode=SUCCESS from=/localhost:54026 executionTimeUs=10600 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{requireSize=457, partitionIdsSize=2} return{requireBufferId=1}
[2024-10-22 18:53:40.106] cmd=requireBuffer statusCode=SUCCESS from=/localhost:54026 executionTimeUs=11154 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{requireSize=251, partitionIdsSize=1} return{requireBufferId=2}
[2024-10-22 18:53:40.264] cmd=sendShuffleData statusCode=SUCCESS from=/localhost:60484 executionTimeUs=82391 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{requireBufferId=2, requireSize=251, isPreAllocated=true, requireBlocksSize=48, stageAttemptNumber=0, partitionCount=1}
[2024-10-22 18:53:40.264] cmd=sendShuffleData statusCode=SUCCESS from=/localhost:60486 executionTimeUs=83382 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{requireBufferId=1, requireSize=457, isPreAllocated=true, requireBlocksSize=110, stageAttemptNumber=0, partitionCount=2}
[2024-10-22 18:53:40.327] cmd=commitShuffleTask statusCode=SUCCESS from=/localhost:54026 executionTimeUs=16662 appId=app-20241022185329-0080_1729594407610 shuffleId=0 return{commitCount=1}
[2024-10-22 18:53:40.327] cmd=commitShuffleTask statusCode=SUCCESS from=/localhost:54026 executionTimeUs=16689 appId=app-20241022185329-0080_1729594407610 shuffleId=0 return{commitCount=2}
[2024-10-22 18:53:40.514] cmd=reportShuffleResult statusCode=SUCCESS from=/localhost:54026 executionTimeUs=5713 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{taskAttemptId=4, bitmapNum=1, partitionToBlockIdsSize=1}
[2024-10-22 18:53:41.376] cmd=finishShuffle statusCode=SUCCESS from=/localhost:54026 executionTimeUs=1025196 appId=app-20241022185329-0080_1729594407610 shuffleId=0
[2024-10-22 18:53:41.701] cmd=reportShuffleResult statusCode=SUCCESS from=/localhost:54026 executionTimeUs=3741 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{taskAttemptId=0, bitmapNum=1, partitionToBlockIdsSize=2}
[2024-10-22 18:53:42.151] cmd=getShuffleResultForMultiPart statusCode=SUCCESS from=/localhost:54026 executionTimeUs=33402 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{partitionsListSize=1, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 18:53:42.151] cmd=getShuffleResultForMultiPart statusCode=SUCCESS from=/localhost:54026 executionTimeUs=32826 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{partitionsListSize=1, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 18:53:42.248] cmd=getLocalShuffleIndex statusCode=SUCCESS from=/localhost:60486 executionTimeUs=20195 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{requestId=2, partitionId=0, partitionNumPerRange=1, partitionNum=2} return{len=40}
[2024-10-22 18:53:42.249] cmd=getLocalShuffleIndex statusCode=SUCCESS from=/localhost:60486 executionTimeUs=629 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{requestId=3, partitionId=1, partitionNumPerRange=1, partitionNum=2} return{len=80}
[2024-10-22 18:53:42.268] cmd=getLocalShuffleData statusCode=SUCCESS from=/localhost:60484 executionTimeUs=1727 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{requestId=4, partitionId=1, partitionNumPerRange=1, partitionNum=2, offset=0, length=49} return{len=49}
[2024-10-22 18:53:42.269] cmd=getLocalShuffleData statusCode=SUCCESS from=/localhost:60484 executionTimeUs=370 appId=app-20241022185329-0080_1729594407610 shuffleId=0 args{requestId=5, partitionId=0, partitionNumPerRange=1, partitionNum=2, offset=0, length=13} return{len=13}
- New Client -> New Coordinator / Old Server : use blockIdLayout by getShuffleReportRequest, nothing change.
Test Results
2 926 files +29 2 926 suites +29 6h 16m 45s :stopwatch: + 35m 38s 1 049 tests ± 0 1 047 :white_check_mark: + 2 2 :zzz: ±0 0 :x: - 1 13 045 runs +62 13 015 :white_check_mark: +64 30 :zzz: ±0 0 :x: - 1
Results for commit 2cac65a1. ± Comparison against base commit 2b70eb4d.
:recycle: This comment has been updated with latest results.
@jerqi Would you like to take a look at this PR? Thanks!
I have already had server version, you can see Constants::SHUFFLE_SERVER_VERSION
@jerqi Thanks for this remind, after go through the code related to SHUFFLE_SERVER_VERSION, I have some primary knowledge about this.
- SHUFFLE_SERVER_VERSION is used as a tag of server
- Coordinator select the servers for client follow the tag filter from client
- Metrics related
In conclusion, I guess the motivation of the SHUFFLE_SERVER_VERSION aimed to select the same version/tag server for client. String type tag/version make easy to understand version, but hard to extend, if we extend a new feature base on ss_v5, old client could not use ss_v5 as tag to get assignment, even the new feature include ss_v5 feature.
But this motivation of ServiceVersion is aimed to make client adopt to the servers with different serviceVersion, to make them all works.
@jerqi Thanks for this remind, after go through the code related to
SHUFFLE_SERVER_VERSION, I have some primary knowledge about this.
- SHUFFLE_SERVER_VERSION is used as a tag of server
- Coordinator select the servers for client follow the tag filter from client
- Metrics related
In conclusion, I guess the motivation of the
SHUFFLE_SERVER_VERSIONaimed to select the same version/tag server for client. String type tag/version make easy to understand version, but hard to extend, if we extend a new feature base onss_v5, old client could not usess_v5as tag to get assignment, even the new feature includess_v5feature.But this motivation of
ServiceVersionis aimed to make client adopt to the servers with differentserviceVersion, to make them all work
NO, if we give uncompatible update, we should change the version.