incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

[FEATURE] Cache index files on the server side

Open xianjingfeng opened this issue 3 years ago • 19 comments

Code of Conduct

Search before asking

  • [X] I have searched in the issues and found no similar issues.

Describe the feature

If we use AQE, index files will be read many times. So we should cache it in memory.

Motivation

For better performance and lower disk io.

Describe the solution

  1. Cache it. Free discussion on specific details.
  2. Filter index data before return to the clients.

Additional context

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

xianjingfeng avatar Dec 13 '22 03:12 xianjingfeng

PTAL @jerqi @zuston

xianjingfeng avatar Dec 13 '22 03:12 xianjingfeng

Do you observe any performance issue for this case?

If the index file is only accessed few times(say 1,2 times), there's no need to cache this file. And normally, the linux system will have file cache to speed up file access.

If the shuffle server is indeed influenced by this problem, I believe we may need an IndexFileAccessManager to trigger the index file caching and manage eviction.

advancedxy avatar Dec 13 '22 09:12 advancedxy

I think this will bring some memory pressure, especially for some huge partitions.

zuston avatar Dec 13 '22 11:12 zuston

Do you observe any performance issue for this case?

No, I just think this should be an optimization point.

If the index file is only accessed few times(say 1,2 times), there's no need to cache this file.

We can just cache the partitions of applicaitions which enable AQE

And normally, the linux system will have file cache to speed up file access.

But linux system will cache the data file too.

xianjingfeng avatar Dec 13 '22 13:12 xianjingfeng

I think this will bring some memory pressure, especially for some huge partitions.

I had counted the files of a server in our production environment. About 10M of data on average generates 1K index file. It means that if we have 10T data, there are about 1G index files. If we compress the index before store it in memory, i think it's acceptable.

xianjingfeng avatar Dec 13 '22 13:12 xianjingfeng

We can just cache the partitions of applicaitions which enable AQE

AQE is enabled by default in later spark versions such as Spark 3.3 and also it may turned on by default in some production envs. So I believe by simply cache index file for all the AQE applications might not be sufficient. The cache behavior might still be triggered by access pattern.

Also another question, in which cases AQE would trigger reading the same partition many times? I know the OptimizeSkewedJoin would split the same partition into multiple parts, and therefore trigger the behavior you described. Is there any other cases?

advancedxy avatar Dec 13 '22 13:12 advancedxy

Also another question, in which cases AQE would trigger reading the same partition many times? I know the OptimizeSkewedJoin would split the same partition into multiple parts, and therefore trigger the behavior you described. Is there any other cases?

rebalance

xianjingfeng avatar Dec 14 '22 03:12 xianjingfeng

About 10M of data on average generates 1K index file. It means that if we have 10T data, there are about 1G index files.

Emm... It's too big. For huge partition, 200G data size is normal.

zuston avatar Dec 14 '22 03:12 zuston

Besides, we'd better to have a unified memory manager in shuffle-server if using this cache.

zuston avatar Dec 14 '22 04:12 zuston

Also another question, in which cases AQE would trigger reading the same partition many times? I know the OptimizeSkewedJoin would split the same partition into multiple parts, and therefore trigger the behavior you described. Is there any other cases?

rebalance

could you elaborate it a bit more? Do you mean OptimizeSkewInRebalancePartitions? I just checked the code and I believe the optimization rule is only applied when user rebalances output and there's indeed some skew partitions.

advancedxy avatar Dec 14 '22 05:12 advancedxy

Emm... It's too big. For huge partition, 200G data size is normal.

I mean even if we cache all the local index files, we won't use too much memory. I think 3g is enough for most servers. For us, the bottleneck is the network and disk, not the memory.

Besides, we'd better to have a unified memory manager in shuffle-server if using this cache.

You are right. And I think we should do this after #133 .

xianjingfeng avatar Dec 14 '22 07:12 xianjingfeng

Do you mean OptimizeSkewInRebalancePartitions? I just checked the code and I believe the optimization rule is only applied when user rebalances output and there's indeed some skew partitions.

Yes. Reduce task will read all index files of all partitions. You can try to run a job and trace the logs.

xianjingfeng avatar Dec 14 '22 07:12 xianjingfeng

Yes. Reduce task will read all index files of all partitions. You can try to run a job and trace the logs.

This doesn't seem right. Do you have a minimal reproduce job to reproduce this case?

advancedxy avatar Dec 14 '22 09:12 advancedxy

This doesn't seem right. Do you have a minimal reproduce job to reproduce this case?

https://github.com/apache/spark/blob/0fd1f85f16502d1d4222cf3d7abfd5e2b86464e6/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala#L228

xianjingfeng avatar Dec 14 '22 13:12 xianjingfeng

If we cache it, we don't even need to flush it to the disk. @zuston @jerqi @advancedxy

xianjingfeng avatar Dec 15 '22 11:12 xianjingfeng

https://github.com/apache/spark/blob/0fd1f85f16502d1d4222cf3d7abfd5e2b86464e6/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala#L228

I didn't get a change to reproduce this case. I will do it in this week.

I still think it should be cached when necessary and selectively.

If we cache it, we don't even need to flush it to the disk

Well, I'm for this idea as I'd like to reduce small I/Os to disk as much as possible. But memory is still an important factor to consider, as we want to deploy Uniffle shuffle server with smaller memory, say 32GB or 48GB.

advancedxy avatar Dec 15 '22 12:12 advancedxy

Well, I'm for this idea as I'd like to reduce small I/Os to disk as much as possible. But memory is still an important factor to consider, as we want to deploy Uniffle shuffle server with smaller memory, say 32GB or 48GB.

This feature can be optional.

xianjingfeng avatar Dec 15 '22 12:12 xianjingfeng

https://github.com/apache/spark/blob/0fd1f85f16502d1d4222cf3d7abfd5e2b86464e6/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala#L228

I didn't get a change to reproduce this case. I will do it in this week.

I still think it should be cached when necessary and selectively.

You can see this method and pay attention to startMapIndex endMapIndex startPartition endPartition when debug the above ut https://github.com/apache/spark/blob/0fd1f85f16502d1d4222cf3d7abfd5e2b86464e6/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L124

xianjingfeng avatar Dec 15 '22 12:12 xianjingfeng

Is it possible to store index into rocksdb as rocksdb is well known to store meta data for some distributed storage system, e.g. Alluxio, Ozone.

Or we can store index file into a distributed KV store, for example TIKV?

maobaolong avatar Nov 16 '24 14:11 maobaolong