leofs icon indicating copy to clipboard operation
leofs copied to clipboard

[Survey] Spark (Hadoop) Support via S3 API

Open windkit opened this issue 8 years ago • 13 comments


As LeoFS is good at handling small files (images, logs, ...), it may fill in the missing part of HDFS (which does not work well with small files)


Spark 1.6.1 (Hadoop 2.6.2)

Extra Libraries

  • hadoop-aws-2.7.1.jar
  • aws-java-sdk-1.7.4.jar


./pyspark --jars /opt/spark-1.6.1-bin-hadoop2.6/lib/hadoop-aws-2.7.1.jar,/opt/spark-1.6.1-bin-hadoop2.6/lib/aws-java-sdk-1.7.4.jar --master yarn-client --num-executors 10
>>> sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "")
>>> sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "05236")
>>> sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "802562235")
>>> sc.textFile("s3a://test/test.txt").count()
17/01/31 16:15:12 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 222.4 KB, free 465.2 KB)
17/01/31 16:15:12 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 21.2 KB, free 486.4 KB)
17/01/31 16:15:12 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on (size: 21.2 KB, free: 511.1 MB)
17/01/31 16:15:12 INFO SparkContext: Created broadcast 1 from textFile at NativeMethodAccessorImpl.java:-2
17/01/31 16:15:13 INFO FileInputFormat: Total input paths to process : 1
17/01/31 16:15:13 INFO SparkContext: Starting job: count at <stdin>:1
17/01/31 16:15:13 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 2 output partitions
17/01/31 16:15:13 INFO DAGScheduler: Final stage: ResultStage 0 (count at <stdin>:1)
17/01/31 16:15:13 INFO DAGScheduler: Parents of final stage: List()
17/01/31 16:15:13 INFO DAGScheduler: Missing parents: List()
17/01/31 16:15:13 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[5] at count at <stdin>:1), which has no missing parents
17/01/31 16:15:13 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 5.3 KB, free 491.8 KB)
17/01/31 16:15:13 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.4 KB, free 495.2 KB)
17/01/31 16:15:13 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on (size: 3.4 KB, free: 511.1 MB)
17/01/31 16:15:13 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
17/01/31 16:15:13 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (PythonRDD[5] at count at <stdin>:1)
17/01/31 16:15:13 INFO YarnScheduler: Adding task set 0.0 with 2 tasks
17/01/31 16:15:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, spark04, partition 0,PROCESS_LOCAL, 2247 bytes)
17/01/31 16:15:13 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, spark02, partition 1,PROCESS_LOCAL, 2247 bytes)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on spark02:46011 (size: 3.4 KB, free: 511.5 MB)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on spark02:46011 (size: 21.2 KB, free: 511.5 MB)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on spark04:45269 (size: 3.4 KB, free: 511.5 MB)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on spark04:45269 (size: 21.2 KB, free: 511.5 MB)
17/01/31 16:15:16 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3292 ms on spark04 (1/2)
17/01/31 16:15:18 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 5347 ms on spark02 (2/2)
17/01/31 16:15:18 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) finished in 5.365 s
17/01/31 16:15:18 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/01/31 16:15:18 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 5.413051 s

windkit avatar Jan 31 '17 07:01 windkit

Write Performance Issue

The current write to file with s3a:// does not fit S3-API well in terms of performance For a simple write, there are >3000 operations to LeoFS involved.

[HEAD]  test    test/test20.txt 0       0       2017-01-31 16:58:34.134929 +0900        1485849514134990        404     2
[HEAD]  test    test/test20.txt/        0       0       2017-01-31 16:58:34.139119 +0900        1485849514139151        404     2
[BUCKET-GET]    test/   test20.txt/     0       0       2017-01-31 16:58:34.143923 +0900        1485849514143956        200     3
[HEAD]  test    test/test20.txt/_temporary/0    0       0       2017-01-31 16:58:34.148843 +0900        1485849514148889        404     2
[HEAD]  test    test/test20.txt/_temporary/0/   0       0       2017-01-31 16:58:34.153007 +0900        1485849514153042        404     2
[BUCKET-GET]    test/   test20.txt/_temporary/0/        0       0       2017-01-31 16:58:34.157585 +0900        1485849514157609        200     2
[HEAD]  test    test/test20.txt/_temporary/0    0       0       2017-01-31 16:58:34.161161 +0900        1485849514161191        404     2
[HEAD]  test    test/test20.txt/_temporary/0/   0       0       2017-01-31 16:58:34.165068 +0900        1485849514165102        404     2
[BUCKET-GET]    test/   test20.txt/_temporary/0/        0       0       2017-01-31 16:58:34.169758 +0900        1485849514169782        200     3
[HEAD]  test    test/test20.txt/_temporary      0       0       2017-01-31 16:58:34.173196 +0900        1485849514173232        404     2
[HEAD]  test    test/test20.txt/_temporary/     0       0       2017-01-31 16:58:34.176176 +0900        1485849514176220        404     1
[BUCKET-GET]    test/   test20.txt/_temporary/  0       0       2017-01-31 16:58:34.180714 +0900        1485849514180744        200     3
[HEAD]  test    test/test20.txt 0       0       2017-01-31 16:58:34.183919 +0900        1485849514183952        404     1
[HEAD]  test    test/test20.txt/        0       0       2017-01-31 16:58:34.187476 +0900        1485849514187502        404     2
[BUCKET-GET]    test/   test20.txt/     0       0       2017-01-31 16:58:34.191703 +0900        1485849514191733        200     2
[BUCKET-GET]    test/           0       0       2017-01-31 16:58:34.228879 +0900        1485849514228920        200     35
[PUT]   test    test/test20.txt/_temporary/0/   0       0       2017-01-31 16:58:34.233079 +0900        1485849514233105        200     2
[PUT]   test    test/test20.txt/_temporary/0/_temporary/attempt_201701311658_0002_m_000023_103/part-00023       0       0       2017-01-31 16:58:34.359964 +0900        1485849514360034     200      2
[PUT]   test    test/test20.txt/_temporary/0/_temporary/attempt_201701311658_0002_m_000004_84/part-00004        0       0       2017-01-31 16:58:34.360232 +0900        1485849514360266     200      2
[PUT]   test    test/test20.txt/_temporary/0/_temporary/attempt_201701311658_0002_m_000032_112/part-00032       0       0       2017-01-31 16:58:34.360844 +0900        1485849514360875     200      2


windkit avatar Jan 31 '17 08:01 windkit

Write Issue

From time to time, the write could be failed


Normal spark_write.access.log.gz spark_write.spark.log.gz

Error spark_fail_write.access.log.gz spark_fail_write.spark.log.gz

windkit avatar Jan 31 '17 08:01 windkit

Read Issue

Read requests are issued as Range Get (as tasks could be partitioned to multiple worker) Cache in leo_gateway is not used.

windkit avatar Feb 01 '17 08:02 windkit

Test with Spark 2.1.0 + Hadoop 3.0.0-alpha2

To test with new version of Hadoop which includes performance improvement of S3 support



Number of requests decreases from >3000 to ~600 for one small write

$ bin/pyspark
>>> sc.parallelize([1,2,3]).saveAsTextFile("s3a://test/testout.txt")

windkit avatar Feb 09 '17 04:02 windkit

Small File Testing

** Data Set ** ~3900 Image File (Total: ~170 MB)

Hadoop Setting

1x Name node (+Secondary Name Node) 3x Data node

Read from Hadoop

>>> sc.textFile("hdfs://leofs-ubuntu1404-node08:9000/images/*").count()

Duration: 9s

Metric Min 25th percentile Median 75th percentile Max
Duration 5 ms 10 ms 41 ms 42 ms 0.1 s
GC Time 0 ms 0 ms 0 ms 0 ms 51 ms

Read from LeoFS

>>> sc.textFile("s3a://test/*").count()

Duration: 9s

Metric Min 25th percentile Median 75th percentile Max
Duration 10 ms 16 ms 41 ms 45 ms 0.1 s
GC Time 0 ms 0 ms 0 ms 0 ms 30 ms

windkit avatar Feb 09 '17 05:02 windkit

With a large data set (500 dirs x 1600 files), it took too long to list the number of objects Know Issue: https://github.com/leo-project/leofs/issues/548

It is quite difficult to work with common use pattern e.g. sc.textfile("s3a://test_bucket/*")

windkit avatar Mar 15 '17 00:03 windkit

@windkit yes it's unavoidable in case sc.textfile("s3a://test_bucket/*") until #548 fixed. however that could be mitigated if sc.textfile("s3a://test_bucket/${DIR}/*") could be used instead as the prefix match can be effective on the local node with metadata backed by leveldb. isn't it possible in your use case?

mocchira avatar Mar 15 '17 01:03 mocchira

@mocchira Yes, that's the way I am trying to work around the bottleneck. Will update here later

windkit avatar Mar 15 '17 01:03 windkit

Gateway Logs (Added Copy Log at info) spark_fail.access.txt spark_fail.info.txt

Storage Logs (Added prefix search log at info) spark_fail.saccess.txt spark_fail.sinfo.txt

windkit avatar Jul 03 '17 08:07 windkit

  • [ ] Investigate Write Problem
    • [x] Complete Access Log on Gateway (Missing Copy, ...)
    • [x] Complete Access Log on Storage (Prefix Search, ...)
    • [x] Fix the directory cleanup and re-create issue https://github.com/leo-project/leofs/issues/783
  • [ ] Performance Tuning
    • [ ] Range Cache (Hadoop always uses Range Get)

windkit avatar Jul 03 '17 23:07 windkit

Issue Summary

Directory deletion is done asynchronously, if the directory is re-created afterwards, incorrect deletion will happen, similar to the bucket case https://github.com/leo-project/leofs/issues/150

spark_fail_sinfo.txt spark_fail_access.txt spark_fail_info.txt spark_fail_saccess.txt

Related PR


windkit avatar Jul 05 '17 04:07 windkit

Issue fixed with Spark 2.2.0 + Hadoop 2.7.3 and Spark 1.6.1 + Hadoop 2.6.3 with hadoop-aws-2.7.3.jar and aws-java-sdk-1.7.4.jar

windkit avatar Aug 24 '17 00:08 windkit

With s3a:// available, it may be possible to use cp / distcp to copy file to/from hadoop



  • [ ] Test cp/distcp

windkit avatar Jan 11 '18 01:01 windkit