delta icon indicating copy to clipboard operation
delta copied to clipboard

[BUG] S3SingleDriverLogStore.listFrom takes a long time for large _delta_logs

Open jkylling opened this issue 3 years ago • 3 comments

Bug

Describe the problem

Reading from a Delta table with a large Delta log takes a long time using S3SingleDriverLogStore. When calculating a snapshot of a Delta table S3SingleDriverLogStore will list all files within the _delta_log key prefix. This can be expensive for Delta logs with many commits. This frequently happens when a streaming job is writing to a table.

To keep reads and writes fast the Delta Log Protocol creates checkpoints of the delta log, typically every 10th commit. To calculate a snapshot of a Delta table we look at the latest checkpoint, together with the commits which have happened since the latest checkpoint. This is done using the listFrom(path: Path) method of the LogStore interface. However, the existing implementation of listFrom in the S3SingleDriverLogStore will list all keys within the _delta_log prefix, and filter the result to only include lexicographic greater keys, see https://github.com/delta-io/delta/blob/fab016e874532cad4de17f1077dbdc860548be9a/storage/src/main/java/io/delta/storage/S3SingleDriverLogStore.java#L228. Listing the entire directory can be time consuming when the table contains many commits.

It is possible to avoid listing all keys under a given key prefix by using the startKey parameter of the S3 ListObjects V2 API. We have applied a patch with this fix in https://github.com/jkylling/delta/commit/ec998ee9bc62b65c0f4be5ae8f38a5c5753b443c.

For the sample tables we tested the patch with this brought the read time down from 20-30 seconds to around 5 seconds with some occasional reads at 15 seconds. The reads at 15 seconds seemed to be related to processing of new checkpoints, as we were streaming to the tables at the same time as we were reading.

I have not tested this for the other LogStore implementations, but by looking at the code they seem to be affected by the same issue.

Steps to reproduce

  1. Create delta table
  2. Read from the delta table, note time
  3. Fill delta log with files (either junk or real commits)
  4. Read from delta table, note time

The below tests can be used for the above steps. See also this commit.

  val bucket = "s3a://your-s3-bucket"

  test("1. create table") {
    spark
      .sparkContext
      .hadoopConfiguration
      .set(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.profile.ProfileCredentialsProvider"
      )

    spark.sql(s"CREATE TABLE delta_log_performace_test1 (value LONG) USING DELTA LOCATION '${bucket}_0'").collect()
    spark.sql(s"CREATE TABLE delta_log_performace_test1 (value LONG) USING DELTA LOCATION '${bucket}_1'").collect()
    spark.sql(s"CREATE TABLE delta_log_performace_test2 (value LONG) USING DELTA LOCATION '${bucket}_2'").collect()
  }

  test("2. fill bucket 2 _delta_log with garbage") {
    println(
      s"""
        |# In a shell run:
        |for i in {1..100000}; do touch .$$i.txt; done
        |aws s3 cp . ${bucket.replace("s3a://", "s3://")}_2/_delta_log/ --recursive
        |""".stripMargin)
  }

  test("3. time selects") {
      spark
        .sparkContext
        .hadoopConfiguration
        .set(
          "fs.s3a.aws.credentials.provider",
          "com.amazonaws.auth.profile.ProfileCredentialsProvider"
        )

    spark.sql(s"select * from delta.`${bucket}_0`") // warmup

    val start0 = System.currentTimeMillis()
    spark.sql(s"select * from delta.`${bucket}_1`")
    println(s"d1 = ${System.currentTimeMillis() - start0}")

    val start1 = System.currentTimeMillis()
    spark.sql(s"select * from delta.`${bucket}_2`")
    println(s"d2 = ${System.currentTimeMillis() - start1}")
    }

A sample run of the "time selects" tests gives

d1 = 1955
d2 = 12034

That is, reading identical tables takes 10 seconds longer when the delta log contains more files. The same happens if the delta log contains commits instead of junk files.

After applying the patch to S3SingleDriverLogStore we get

d1 = 1922
d2 = 1938 

Observed results

Reading from a Delta table with many files takes a really long time.

Expected results

Reading from a Delta table with a _delta_log with many files should take the same amount of time as reading from a _delta_log with few files. That is, the time to get a snapshot should not be proportional to the number of files in the _delta_log.

Further details

The issue can be partially mitigated by setting the delta.stalenessLimit option to a large value. However, I believe the write path would still be affected by this issue, as it seems to force an update of the snapshot before every write.

The same issue can be observed on a local file system by filling the _delta_log directory with junk (or several commits). This issue might affect all LogStores with naive implementations of listFrom.

Environment information

  • Delta Lake version: 1.3.0
  • Spark version: 3.2.1
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [x] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [ ] No. I cannot contribute a bug fix at this time.

I would be happy to contribute to get this issue fixed.The suggested patch might need some work, as it digs quite deep in the AWS Hadoop API.

jkylling avatar Jun 10 '22 18:06 jkylling

Hi @jkylling, thanks for this issue? We will look into this and get back to you.

scottsand-db avatar Jun 13 '22 17:06 scottsand-db

Hi @jkylling, this approach LGTM. Want to make an official PR?

scottsand-db avatar Jun 14 '22 23:06 scottsand-db

Hi @scottsand-db . Thanks for looking into this. I'll open a PR.

jkylling avatar Jun 15 '22 20:06 jkylling

This seems to be enforcing s3a, breaks EMRFS or other Hadoop FileSystem implementations ?

shenavaa avatar Dec 16 '22 13:12 shenavaa

@shenavaa This only works for S3A file system, yup. If you are using EMRFS, then you shouldn't enable this feature.

scottsand-db avatar Dec 19 '22 14:12 scottsand-db

@scottsand-db No option to disable and it's already in 2.3.0 milestone!

shenavaa avatar Dec 20 '22 06:12 shenavaa

Closed by https://github.com/delta-io/delta/commit/c156c9814156dcdbc0524f6e1101accc265fb1b9

scottsand-db avatar Dec 20 '22 15:12 scottsand-db

@shenavaa the feature is automatically disabled by default. You have to explicitly enable it using delta.enableFastS3AListFrom

scottsand-db avatar Dec 20 '22 15:12 scottsand-db