leofs
leofs copied to clipboard
[Survey] Spark (Hadoop) Support via S3 API
Description
As LeoFS is good at handling small files (images, logs, ...), it may fill in the missing part of HDFS (which does not work well with small files)
Environment
Spark 1.6.1 (Hadoop 2.6.2)
Extra Libraries
- hadoop-aws-2.7.1.jar
- aws-java-sdk-1.7.4.jar
Testing
./pyspark --jars /opt/spark-1.6.1-bin-hadoop2.6/lib/hadoop-aws-2.7.1.jar,/opt/spark-1.6.1-bin-hadoop2.6/lib/aws-java-sdk-1.7.4.jar --master yarn-client --num-executors 10
>>> sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://127.0.0.1:12345")
>>> sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "05236")
>>> sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "802562235")
>>>
>>> sc.textFile("s3a://test/test.txt").count()
17/01/31 16:15:12 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 222.4 KB, free 465.2 KB)
17/01/31 16:15:12 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 21.2 KB, free 486.4 KB)
17/01/31 16:15:12 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.11.21:53056 (size: 21.2 KB, free: 511.1 MB)
17/01/31 16:15:12 INFO SparkContext: Created broadcast 1 from textFile at NativeMethodAccessorImpl.java:-2
17/01/31 16:15:13 INFO FileInputFormat: Total input paths to process : 1
17/01/31 16:15:13 INFO SparkContext: Starting job: count at <stdin>:1
17/01/31 16:15:13 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 2 output partitions
17/01/31 16:15:13 INFO DAGScheduler: Final stage: ResultStage 0 (count at <stdin>:1)
17/01/31 16:15:13 INFO DAGScheduler: Parents of final stage: List()
17/01/31 16:15:13 INFO DAGScheduler: Missing parents: List()
17/01/31 16:15:13 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[5] at count at <stdin>:1), which has no missing parents
17/01/31 16:15:13 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 5.3 KB, free 491.8 KB)
17/01/31 16:15:13 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.4 KB, free 495.2 KB)
17/01/31 16:15:13 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.11.21:53056 (size: 3.4 KB, free: 511.1 MB)
17/01/31 16:15:13 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
17/01/31 16:15:13 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (PythonRDD[5] at count at <stdin>:1)
17/01/31 16:15:13 INFO YarnScheduler: Adding task set 0.0 with 2 tasks
17/01/31 16:15:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, spark04, partition 0,PROCESS_LOCAL, 2247 bytes)
17/01/31 16:15:13 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, spark02, partition 1,PROCESS_LOCAL, 2247 bytes)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on spark02:46011 (size: 3.4 KB, free: 511.5 MB)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on spark02:46011 (size: 21.2 KB, free: 511.5 MB)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on spark04:45269 (size: 3.4 KB, free: 511.5 MB)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on spark04:45269 (size: 21.2 KB, free: 511.5 MB)
17/01/31 16:15:16 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3292 ms on spark04 (1/2)
17/01/31 16:15:18 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 5347 ms on spark02 (2/2)
17/01/31 16:15:18 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) finished in 5.365 s
17/01/31 16:15:18 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/01/31 16:15:18 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 5.413051 s
2993
Write Performance Issue
The current write to file with s3a://
does not fit S3-API well in terms of performance
For a simple write, there are >3000 operations to LeoFS involved.
[HEAD] test test/test20.txt 0 0 2017-01-31 16:58:34.134929 +0900 1485849514134990 404 2
[HEAD] test test/test20.txt/ 0 0 2017-01-31 16:58:34.139119 +0900 1485849514139151 404 2
[BUCKET-GET] test/ test20.txt/ 0 0 2017-01-31 16:58:34.143923 +0900 1485849514143956 200 3
[HEAD] test test/test20.txt/_temporary/0 0 0 2017-01-31 16:58:34.148843 +0900 1485849514148889 404 2
[HEAD] test test/test20.txt/_temporary/0/ 0 0 2017-01-31 16:58:34.153007 +0900 1485849514153042 404 2
[BUCKET-GET] test/ test20.txt/_temporary/0/ 0 0 2017-01-31 16:58:34.157585 +0900 1485849514157609 200 2
[HEAD] test test/test20.txt/_temporary/0 0 0 2017-01-31 16:58:34.161161 +0900 1485849514161191 404 2
[HEAD] test test/test20.txt/_temporary/0/ 0 0 2017-01-31 16:58:34.165068 +0900 1485849514165102 404 2
[BUCKET-GET] test/ test20.txt/_temporary/0/ 0 0 2017-01-31 16:58:34.169758 +0900 1485849514169782 200 3
[HEAD] test test/test20.txt/_temporary 0 0 2017-01-31 16:58:34.173196 +0900 1485849514173232 404 2
[HEAD] test test/test20.txt/_temporary/ 0 0 2017-01-31 16:58:34.176176 +0900 1485849514176220 404 1
[BUCKET-GET] test/ test20.txt/_temporary/ 0 0 2017-01-31 16:58:34.180714 +0900 1485849514180744 200 3
[HEAD] test test/test20.txt 0 0 2017-01-31 16:58:34.183919 +0900 1485849514183952 404 1
[HEAD] test test/test20.txt/ 0 0 2017-01-31 16:58:34.187476 +0900 1485849514187502 404 2
[BUCKET-GET] test/ test20.txt/ 0 0 2017-01-31 16:58:34.191703 +0900 1485849514191733 200 2
[BUCKET-GET] test/ 0 0 2017-01-31 16:58:34.228879 +0900 1485849514228920 200 35
[PUT] test test/test20.txt/_temporary/0/ 0 0 2017-01-31 16:58:34.233079 +0900 1485849514233105 200 2
[PUT] test test/test20.txt/_temporary/0/_temporary/attempt_201701311658_0002_m_000023_103/part-00023 0 0 2017-01-31 16:58:34.359964 +0900 1485849514360034 200 2
[PUT] test test/test20.txt/_temporary/0/_temporary/attempt_201701311658_0002_m_000004_84/part-00004 0 0 2017-01-31 16:58:34.360232 +0900 1485849514360266 200 2
[PUT] test test/test20.txt/_temporary/0/_temporary/attempt_201701311658_0002_m_000032_112/part-00032 0 0 2017-01-31 16:58:34.360844 +0900 1485849514360875 200 2
...
Write Issue
From time to time, the write could be failed
Logs
Normal spark_write.access.log.gz spark_write.spark.log.gz
Error spark_fail_write.access.log.gz spark_fail_write.spark.log.gz
Read Issue
Read requests are issued as Range Get (as tasks could be partitioned to multiple worker)
Cache in leo_gateway
is not used.
Test with Spark 2.1.0 + Hadoop 3.0.0-alpha2
To test with new version of Hadoop which includes performance improvement of S3 support
Logs
spark_210_hadoop_3a2_write.access.log.gz
Number of requests decreases from >3000 to ~600 for one small write
$ bin/pyspark
>>> sc.parallelize([1,2,3]).saveAsTextFile("s3a://test/testout.txt")
Small File Testing
** Data Set ** ~3900 Image File (Total: ~170 MB)
Hadoop Setting
1x Name node (+Secondary Name Node) 3x Data node
Read from Hadoop
>>> sc.textFile("hdfs://leofs-ubuntu1404-node08:9000/images/*").count()
1209364
Duration: 9s
Metric | Min | 25th percentile | Median | 75th percentile | Max |
---|---|---|---|---|---|
Duration | 5 ms | 10 ms | 41 ms | 42 ms | 0.1 s |
GC Time | 0 ms | 0 ms | 0 ms | 0 ms | 51 ms |
Read from LeoFS
>>> sc.textFile("s3a://test/*").count()
1209364
Duration: 9s
Metric | Min | 25th percentile | Median | 75th percentile | Max |
---|---|---|---|---|---|
Duration | 10 ms | 16 ms | 41 ms | 45 ms | 0.1 s |
GC Time | 0 ms | 0 ms | 0 ms | 0 ms | 30 ms |
With a large data set (500 dirs x 1600 files), it took too long to list the number of objects Know Issue: https://github.com/leo-project/leofs/issues/548
It is quite difficult to work with common use pattern e.g. sc.textfile("s3a://test_bucket/*")
@windkit yes it's unavoidable in case sc.textfile("s3a://test_bucket/*") until #548 fixed. however that could be mitigated if sc.textfile("s3a://test_bucket/${DIR}/*") could be used instead as the prefix match can be effective on the local node with metadata backed by leveldb. isn't it possible in your use case?
@mocchira Yes, that's the way I am trying to work around the bottleneck. Will update here later
Gateway Logs (Added Copy Log at info) spark_fail.access.txt spark_fail.info.txt
Storage Logs (Added prefix search log at info) spark_fail.saccess.txt spark_fail.sinfo.txt
- [ ] Investigate Write Problem
- [x] Complete Access Log on Gateway (Missing Copy, ...)
- [x] Complete Access Log on Storage (Prefix Search, ...)
- [x] Fix the directory cleanup and re-create issue https://github.com/leo-project/leofs/issues/783
- [ ] Performance Tuning
- [ ] Range Cache (Hadoop always uses Range Get)
Issue Summary
Directory deletion is done asynchronously, if the directory is re-created afterwards, incorrect deletion will happen, similar to the bucket case https://github.com/leo-project/leofs/issues/150
spark_fail_sinfo.txt spark_fail_access.txt spark_fail_info.txt spark_fail_saccess.txt
Related PR
https://github.com/leo-project/leofs/pull/782
Issue fixed with Spark 2.2.0 + Hadoop 2.7.3 and Spark 1.6.1 + Hadoop 2.6.3 with hadoop-aws-2.7.3.jar and aws-java-sdk-1.7.4.jar
With s3a:// available, it may be possible to use cp
/ distcp
to copy file to/from hadoop
https://community.hortonworks.com/questions/7165/how-to-copy-hdfs-file-to-aws-s3-bucket-hadoop-dist.html
TODO
- [ ] Test cp/distcp