Memory optimization for from spark Dataframe to SparkXShards
Tested with recsys data. Scripts in friesian PR #99. Data is the first 16 partitions.
Memory
| parquet files | records | read as train_df/test_df | shard_size | df2sparkxshards memory peak | df2sparkxshards stable |
|---|---|---|---|---|---|
| 16 | 6063285 | 5.9G | 1000 | 25.7g | 25.7g |
| 16 | 6063285 | 5.9G | None | 36.6g | 25.7g |
For recsys dataset, if I only save some selected features processed here: https://github.com/analytics-zoo/friesian/blob/recsys-challenge/recsys2021/demo/final/scripts/preprocess.py The total parquet size is around 3.9G and when I read parquet I get 200+ partitions. Then when converting to SparkXShards, I still get this error:
ERROR YarnScheduler:70 - Lost executor 1 on Almaren-Node-164: Container killed by YARN for exceeding memory limits. 160.2 GB of 160 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
I have already given 8*100G memory, but gets OOM for 4G data...
If I repartition to say 2000 partitions, it can work.
I tested the version without feature processing with 12 executors.
Found that reading as train_df/test_df took ~30g memory. The parquet file size on disk might not be a good reference, since it should be compressed.
And the peak memory for converting df to SparkXShards is 130g for each worker (increased 100g in df_to_SparkXShards). And the stable memory size is 80g-90g (increased 50-60g in df_to_SparkXShards).
If the partition is too large, our UDF which converts df to numpy might be very memory consuming. That could explain the reason repartition works. You could also try setting shard_size to reduce the peak memory.
I will try optimizing the memory while converting df to SparkXShards, and also try either guiding user to set shard_size or setting a default value to mitigate the memory issue.