delta
delta copied to clipboard
[BUG] S3SingleDriverLogStore.listFrom takes a long time for large _delta_logs
Bug
Describe the problem
Reading from a Delta table with a large Delta log takes a long time using S3SingleDriverLogStore. When calculating a snapshot of a Delta table S3SingleDriverLogStore will list all files within the _delta_log key prefix. This can be expensive for Delta logs with many commits. This frequently happens when a streaming job is writing to a table.
To keep reads and writes fast the Delta Log Protocol creates checkpoints of the delta log, typically every 10th commit. To calculate a snapshot of a Delta table we look at the latest checkpoint, together with the commits which have happened since the latest checkpoint. This is done using the listFrom(path: Path) method of the LogStore interface. However, the existing implementation of listFrom in the S3SingleDriverLogStore will list all keys within the _delta_log prefix, and filter the result to only include lexicographic greater keys, see https://github.com/delta-io/delta/blob/fab016e874532cad4de17f1077dbdc860548be9a/storage/src/main/java/io/delta/storage/S3SingleDriverLogStore.java#L228. Listing the entire directory can be time consuming when the table contains many commits.
It is possible to avoid listing all keys under a given key prefix by using the startKey parameter of the S3 ListObjects V2 API. We have applied a patch with this fix in https://github.com/jkylling/delta/commit/ec998ee9bc62b65c0f4be5ae8f38a5c5753b443c.
For the sample tables we tested the patch with this brought the read time down from 20-30 seconds to around 5 seconds with some occasional reads at 15 seconds. The reads at 15 seconds seemed to be related to processing of new checkpoints, as we were streaming to the tables at the same time as we were reading.
I have not tested this for the other LogStore implementations, but by looking at the code they seem to be affected by the same issue.
Steps to reproduce
- Create delta table
- Read from the delta table, note time
- Fill delta log with files (either junk or real commits)
- Read from delta table, note time
The below tests can be used for the above steps. See also this commit.
val bucket = "s3a://your-s3-bucket"
test("1. create table") {
spark
.sparkContext
.hadoopConfiguration
.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.profile.ProfileCredentialsProvider"
)
spark.sql(s"CREATE TABLE delta_log_performace_test1 (value LONG) USING DELTA LOCATION '${bucket}_0'").collect()
spark.sql(s"CREATE TABLE delta_log_performace_test1 (value LONG) USING DELTA LOCATION '${bucket}_1'").collect()
spark.sql(s"CREATE TABLE delta_log_performace_test2 (value LONG) USING DELTA LOCATION '${bucket}_2'").collect()
}
test("2. fill bucket 2 _delta_log with garbage") {
println(
s"""
|# In a shell run:
|for i in {1..100000}; do touch .$$i.txt; done
|aws s3 cp . ${bucket.replace("s3a://", "s3://")}_2/_delta_log/ --recursive
|""".stripMargin)
}
test("3. time selects") {
spark
.sparkContext
.hadoopConfiguration
.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.profile.ProfileCredentialsProvider"
)
spark.sql(s"select * from delta.`${bucket}_0`") // warmup
val start0 = System.currentTimeMillis()
spark.sql(s"select * from delta.`${bucket}_1`")
println(s"d1 = ${System.currentTimeMillis() - start0}")
val start1 = System.currentTimeMillis()
spark.sql(s"select * from delta.`${bucket}_2`")
println(s"d2 = ${System.currentTimeMillis() - start1}")
}
A sample run of the "time selects" tests gives
d1 = 1955
d2 = 12034
That is, reading identical tables takes 10 seconds longer when the delta log contains more files. The same happens if the delta log contains commits instead of junk files.
After applying the patch to S3SingleDriverLogStore we get
d1 = 1922
d2 = 1938
Observed results
Reading from a Delta table with many files takes a really long time.
Expected results
Reading from a Delta table with a _delta_log with many files should take the same amount of time as reading from a _delta_log with few files. That is, the time to get a snapshot should not be proportional to the number of files in the _delta_log.
Further details
The issue can be partially mitigated by setting the delta.stalenessLimit option to a large value. However, I believe the write path would still be affected by this issue, as it seems to force an update of the snapshot before every write.
The same issue can be observed on a local file system by filling the _delta_log directory with junk (or several commits). This issue might affect all LogStores with naive implementations of listFrom.
Environment information
- Delta Lake version: 1.3.0
- Spark version: 3.2.1
- Scala version: 2.12
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [x] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [ ] No. I cannot contribute a bug fix at this time.
I would be happy to contribute to get this issue fixed.The suggested patch might need some work, as it digs quite deep in the AWS Hadoop API.
Hi @jkylling, thanks for this issue? We will look into this and get back to you.
Hi @jkylling, this approach LGTM. Want to make an official PR?
Hi @scottsand-db . Thanks for looking into this. I'll open a PR.
This seems to be enforcing s3a, breaks EMRFS or other Hadoop FileSystem implementations ?
@shenavaa This only works for S3A file system, yup. If you are using EMRFS, then you shouldn't enable this feature.
@scottsand-db No option to disable and it's already in 2.3.0 milestone!
Closed by https://github.com/delta-io/delta/commit/c156c9814156dcdbc0524f6e1101accc265fb1b9
@shenavaa the feature is automatically disabled by default. You have to explicitly enable it using delta.enableFastS3AListFrom