incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

[Improvement] Support more tasks of the application

Open jerqi opened this issue 3 years ago • 4 comments

The current blockId is designed as following:

 // BlockId is long and composed by partitionId, executorId and AtomicInteger
 // AtomicInteger is first 19 bit, max value is 2^19 - 1
 // partitionId is next 24 bit, max value is 2^24 - 1
 // taskAttemptId is rest of 20 bit, max value is 2^20 - 1

Why we need blockId? It's designed for data check, filter, memory data read, etc.

Why blockId is designed as above? BlockId will be stored in Shuffle server, to reduce memory cost. Roaringbitmap is used to cache it. According to implementation of Roaringbitmap, the design of BlockId is target to use BitmapContainer instead of ArrayContainer for memory saving.

What's the problem of blockId? It can't support taskId which is greater than 2^20 - 1

Proposal I think the first 19 bit is too much for atomic int, and we can leverage some of them for taskId.

jerqi avatar Aug 06 '22 02:08 jerqi

Currently we can support 2^20 tasks, which is not a small number. If spark.rss.writer.buffer.size is set to the default value of 3m, then the data written by a taskAttempt does not exceed 3m*2^19=1536G It can be guaranteed, how to measure whether AtomicInteger can be reduced? How much can be reduced?

leixm avatar Aug 25 '22 16:08 leixm

It looks like taskAttemptId has been modified from 2^20 to 2^21, but the comment for org.apache.uniffle.client.util.ClientUtils has not been modified. org.apache.uniffle.common.util.Constants public static final int TASK_ATTEMPT_ID_MAX_LENGTH = 21; public static final int ATOMIC_INT_MAX_LENGTH = 18; org.apache.uniffle.client.util.ClientUtils // AtomicInteger is first 19 bit, max value is 2^19 - 1 // taskAttemptId is rest of 20 bit, max value is 2^20 - 1

leixm avatar Aug 26 '22 02:08 leixm

It looks like taskAttemptId has been modified from 2^20 to 2^21, but the comment for org.apache.uniffle.client.util.ClientUtils has not been modified. org.apache.uniffle.common.util.Constants public static final int TASK_ATTEMPT_ID_MAX_LENGTH = 21; public static final int ATOMIC_INT_MAX_LENGTH = 18; org.apache.uniffle.client.util.ClientUtils // AtomicInteger is first 19 bit, max value is 2^19 - 1 // taskAttemptId is rest of 20 bit, max value is 2^20 - 1

You're right.

jerqi avatar Aug 26 '22 03:08 jerqi

Currently we can support 2^20 tasks, which is not a small number. If spark.rss.writer.buffer.size is set to the default value of 3m, then the data written by a taskAttempt does not exceed 3m*2^19=1536G It can be guaranteed, how to measure whether AtomicInteger can be reduced? How much can be reduced?

It depends on the production environment application.

jerqi avatar Aug 26 '22 03:08 jerqi

If spark.rss.writer.buffer.size is set to the default value of 3m, then the data written by a taskAttempt does not exceed 3m*2^19=1536G

You also have to take spark.rss.writer.buffer.spill.size (default of 128m) into consideration. Once that amount of memory is occupied by all buffers (one buffer per partition), all buffers will be sent to the server, no matter how full the buffer are. If you map to 100,000 partitions you will flush 100,000 buffers each having 1.28 kByte. This will cause high sequence numbers. This will then at most work with 0,64G.

EnricoMi avatar Jan 29 '24 20:01 EnricoMi

It looks like taskAttemptId has been modified from 2^20 to 2^21

Fixed in #1492.

EnricoMi avatar Jan 30 '24 07:01 EnricoMi