delta icon indicating copy to clipboard operation
delta copied to clipboard

[BUG] Delta table on S3 doesn't delete historical log files (using GLUE and EMR)

Open maor-sa opened this issue 3 years ago • 12 comments

Bug

Delta table property 'delta.logRetentionDuration'='interval 7 days' doesn't take effect using AWS glue catalog (external table) and EMR.

Describe the problem

We are using Databricks Delta lake open source io.delta:delta-core_2.12:1.0.0 on AWS EMR (6.5.0) Spark (3.1.2) integrated with S3, Glue and Athena, with the following properties -

TBLPROPERTIES (
  'delta.autoOptimize.autoCompact'='true', 
  'delta.autoOptimize.optimizeWrite'='true', 
  'delta.compatibility.symlinkFormatManifest.enabled'='true', 
  'delta.logRetentionDuration'='interval 7 days', 
  'spark.databricks.delta.retentionDurationCheck.enabled'='true', 
  'spark.sql.create.version'='3.1.2-amzn-1', 
  'spark.sql.partitionProvider'='catalog', 
  'spark.sql.sources.schema.numPartCols'='1', 
  'spark.sql.sources.schema.numParts'='1', 

As mentioned at the official doc - delta-vacuum delta-utility

For the Log files - "Log files are deleted automatically and asynchronously after checkpoint operations. The default retention period of log files is 30 days, configurable through the delta.logRetentionDuration property which you set with the ALTER TABLE SET TBLPROPERTIES SQL method."

I set the table property as suggested but nevertheless log files for more then 7 days are not being deleted automatically neither after I execute deltaTable.vacuum(retentionDuration) or deltaTable.generate("symlink_format_manifest") Log files location - s3://xxxx/database/my_table/_delta_log/

Steps to reproduce

  1. Launch AWS EMR cluster

  2. Launch Spark-shell spark-shell --packages io.delta:delta-core_2.12:1.0.0 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.sql.caseSensitive=true

  3. Initialize delta table with data

:paste
val input = "s3://my_bucket/initial_data/"


val schema: StructType = StructType(
      StructField("ts", StringType, nullable = true) ::
	  StructField("database", StringType, nullable = true) ::
      StructField("type", StringType, nullable = true) ::
      StructField("data", StringType, nullable = true) ::
      StructField("table", StringType, nullable = true) :: Nil)

val key: StructType = StructType(
    StructField("id", StringType, nullable = true) :: Nil)
  
val df2 = spark.read.schema(schema).json(input)
.withColumn("struct",from_json(col("data"), key))
.select("table","struct.id","ts","database","type","data")
df2.printSchema()


:paste
val output = "s3://my_bucket/my_db/delta_table/"

df2
.write
.partitionBy("table")
.format("delta")
.save(output)
  1. Create glue external table
:paste
import io.delta.tables.DeltaTable

val tbl = "delta_table"
val bucket = "my_bucket"
val db = "my_db"
val output_path = s"s3://${bucket}/${db}/maor/${tbl}/"
val deltaTable = DeltaTable.forPath(output_path)

deltaTable.generate("symlink_format_manifest")

spark.sql(s"""
CREATE EXTERNAL TABLE ${db}.${tbl}
(id string, ts string, database string, type string, data string)
PARTITIONED BY (table string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '${output_path}/_symlink_format_manifest/'
""")

spark.sql(s"""ALTER TABLE ${db}.${tbl} SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)""")
spark.sql(s"""ALTER TABLE ${db}.${tbl} SET TBLPROPERTIES(delta.logRetentionDuration="interval 1 hours")""")
spark.sql(s"""ALTER TABLE ${db}.${tbl} SET TBLPROPERTIES(delta.deletedFileRetentionDuration="interval 1 hours")""")
spark.sql(s"""ALTER TABLE ${db}.${tbl} SET TBLPROPERTIES(spark.databricks.delta.retentionDurationCheck.enabled=true)""")
spark.sql(s"""ALTER TABLE ${db}.${tbl} SET TBLPROPERTIES(delta.autoOptimize.optimizeWrite=true)""")
spark.sql(s"""ALTER TABLE ${db}.${tbl} SET TBLPROPERTIES(delta.autoOptimize.autoCompact=true)""")

spark.sql(s"msck repair table ${db}.${tbl}")
  1. Go to Athena and check the table exists
SELECT 
    "table", count(*)
FROM 
    test_dbl.delta_table
group by 
    1
order by 
    count(*)
  1. Go back to spark-shell and update the delta table in order to generate new log files.
import io.delta.tables._

val df = spark.read.json(input)
.withColumn("struct",from_json(col("data"), key))
.select("table","struct.id","ts","database","type","data")

val dt = DeltaTable.forPath(output_path)
val dmb: DeltaMergeBuilder = dt.as("dt_source").merge(df.as("df_new_data"), "dt_source.table = df_new_data.table and dt_source.id = df_new_data.id")

dmb
.whenMatched.updateAll()
.whenNotMatched.insertAll()
.execute()
  1. Do step 6 multiple times to produce multiple logs files at - s3://my_bucket/my_db/delta_table/_delta_log/

  2. Expect NOT to see logs files.

Observed results

Nothing happens

Expected results

I would expect that historical and untracked logs files will be deleted after I execute deltaTable.vacuum(retentionDuration) or deltaTable.generate("symlink_format_manifest")

Further details

Environment information

  • Delta Lake version: io.delta:delta-core_2.12:1.0.0
  • Spark version: 3.1.2
  • Scala version: 2.12
  • EMR version: 6.5.0

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ V ] No. I cannot contribute a bug fix at this time.

maor-sa avatar Jul 28 '22 12:07 maor-sa

Hey @maor-sa, let me first clarify a few things. deltaTable.vacuum(retentionDuration) only removes data files, and not the delta log files (see Data retention). Delta log files are instead removed automatically after checkpoints (based on the table property that you've set 'delta.logRetentionDuration'='interval 7 days').

Checkpoints are by default generated every 10 commits (or every 10 operations on the table) but this can be configured with the table property delta.checkpointInterval. Can you confirm that you are writing to the table after this 7 day duration has passed (or whatever else you set the interval to) and generating a checkpoint, and you still aren't seeing the older log files removed? You can check whether a checkpoint has been generated by looking in the _delta_log directory for the parquet checkpoint files.

If you aren't generating a checkpoint after the log retention duration has elapsed you are seeing the expected behavior. By the way, deltaTable.generate("symlink_format_manifest") isn't an operation on the table and wouldn't generate a checkpoint or trigger log clean up.

allisonport-db avatar Jul 29 '22 00:07 allisonport-db

Hey @allisonport-db

I can confirm everything:

  1. I know and I expect that delta log files will be removed automatically JUST after the checkpoint was created and after the 7 days duration has passed. Unfortunately they aren't.
  2. I also approached AWS official services, they replicate the issue and replied "Currently there is no Databricks integration on EMR which is natively supported."

Thanks

maor-sa avatar Jul 29 '22 08:07 maor-sa

Okay thanks for confirming. We will look into this.

allisonport-db avatar Jul 29 '22 18:07 allisonport-db

Do you have any delta tables older than 30 days? If so, can you check if the log files are being removed for those after the 30 day interval.

Also keep in mind, we don't necessarily remove all log files older than logRetentionDuration, as some may still be needed for time travel.

Could you please provide the _delta_log directory listing for one of the tables?

allisonport-db avatar Jul 29 '22 19:07 allisonport-db

Great,

Currently I don't have log files for more than 20 days, in 10 more days I'll have and I'll provide a directory link.

Thanks, Maor

maor-sa avatar Jul 30 '22 05:07 maor-sa

If you provide the listing for a table with logRetentionDuration=interval 7 days that will still help. And then we can follow up once that 30 day duration has passed as well

allisonport-db avatar Aug 02 '22 17:08 allisonport-db

Hi, we are experimenting the exact same problem. We have deployed the same configuration Delta on AWS EMR (6.5.0) Spark (3.1.2) integrated with S3, Glue and Athena. It noticed that Table properties logRetentionDuration interval together with delta.checkpointInterval to reduce the interval to clean _delta_log they don't semm to be picked up and the _delta_log is not cleaned accordingly.

Do you have an update of this ticket analysis. This is strongly impacting the usage of Delta for us.

MarcBertini avatar Aug 04 '22 14:08 MarcBertini

Hey @MarcBertini is it cleaning up the logs correctly after the default 30 day interval? It would be ideal to pinpoint whether this is an issue with the log cleanup, or with the table property logRetentionDuration.

Again, a directory listing of the _delta_log along with the table properties for that table would help a lot.

allisonport-db avatar Aug 04 '22 18:08 allisonport-db

Please see the attched listing. There are too many so I copied only top and last.

2022-07-10 07:30:34    1161483 00000000000000000000.json
2022-07-10 16:16:22      47124 00000000000000000001.json
2022-07-10 16:16:25     688731 00000000000000000002.json
2022-07-10 16:16:33      52619 00000000000000000003.json
2022-07-10 16:16:35        686 00000000000000000004.json
2022-07-10 16:16:38      30659 00000000000000000005.json
2022-07-10 16:16:40      52952 00000000000000000006.json
2022-07-10 16:16:42      49792 00000000000000000007.json
2022-07-10 16:16:46        683 00000000000000000008.json
2022-07-10 16:16:48        687 00000000000000000009.json
2022-07-10 16:16:52     100712 00000000000000000010.json
2022-07-10 16:16:54     153992 00000000000000000011.json
2022-07-10 16:16:54     525013 00000000000000000010.checkpoint.parquet
2022-07-10 16:16:56     136784 00000000000000000012.json
2022-07-10 16:16:58      53387 00000000000000000013.json
2022-07-10 16:17:01      56053 00000000000000000014.json
2022-07-10 16:17:10      28401 00000000000000000015.json
2022-07-10 16:17:13      90651 00000000000000000016.json
2022-07-10 16:17:20     102606 00000000000000000017.json
2022-07-10 16:17:47      42092 00000000000000000018.json
2022-07-10 16:18:10     115620 00000000000000000019.json
2022-07-10 16:18:15    2156391 00000000000000000020.json
2022-07-10 16:18:19    1149825 00000000000000000020.checkpoint.parquet
2022-07-10 16:18:39     115033 00000000000000000021.json
2022-07-10 16:18:41     438287 00000000000000000022.json
2022-07-10 16:18:47     893493 00000000000000000023.json
2022-07-10 16:19:26     471089 00000000000000000024.json
2022-07-10 16:20:23     223991 00000000000000000025.json
2022-07-10 16:20:36     167727 00000000000000000026.json
2022-07-10 16:20:39      58193 00000000000000000027.json
2022-07-10 16:20:47     217887 00000000000000000028.json
2022-07-10 16:20:50     131138 00000000000000000029.json
2022-07-10 16:20:53     151144 00000000000000000030.json
2022-07-10 16:20:56     163496 00000000000000000031.json
2022-07-10 16:20:57    1477671 00000000000000000030.checkpoint.parquet
2022-07-10 16:22:12     313318 00000000000000000032.json

...
2022-08-05 09:37:32      47181 00000000000000016870.json
2022-08-05 09:37:43    1295074 00000000000000016871.json
2022-08-05 09:37:51     159626 00000000000000016872.json
2022-08-05 09:38:02      26303 00000000000000016873.json
2022-08-05 09:38:09     247660 00000000000000016874.json
2022-08-05 09:38:19      18253 00000000000000016875.json
2022-08-05 09:38:28      10718 00000000000000016876.json
2022-08-05 09:38:28   12190223 00000000000000016870.checkpoint.parquet
2022-08-05 09:38:35      46996 00000000000000016877.json
2022-08-05 09:38:52      92177 00000000000000016878.json
2022-08-05 09:39:14       5228 00000000000000016879.json
2022-08-05 09:39:22        785 00000000000000016880.json
2022-08-05 09:39:35       1301 00000000000000016881.json
2022-08-05 09:39:44      32850 00000000000000016882.json
2022-08-05 09:39:55      97907 00000000000000016883.json
2022-08-05 09:40:01   12823254 00000000000000016880.checkpoint.parquet
2022-08-05 09:40:04     115188 00000000000000016884.json
2022-08-05 09:40:15     107613 00000000000000016885.json
2022-08-05 09:40:42     101727 00000000000000016886.json
2022-08-05 09:40:57        685 00000000000000016887.json
2022-08-05 09:41:05      15711 00000000000000016888.json
2022-08-05 09:42:58      53890 00000000000000016889.json
2022-08-05 09:43:30     116405 00000000000000016890.json
2022-08-05 09:43:44     123736 00000000000000016891.json
2022-08-05 09:43:46   12842603 00000000000000016890.checkpoint.parquet
2022-08-05 09:43:59      71182 00000000000000016892.json
2022-08-05 09:44:17     107892 00000000000000016893.json
2022-08-05 09:44:30     266551 00000000000000016894.json
2022-08-05 09:44:39     395342 00000000000000016895.json
2022-08-05 09:44:50     140420 00000000000000016896.json
2022-08-05 09:45:00      79496 00000000000000016897.json
2022-08-05 09:45:08     224944 00000000000000016898.json
2022-08-05 09:45:18     152049 00000000000000016899.json
2022-08-05 09:45:27     245766 00000000000000016900.json
2022-08-05 09:45:34     402397 00000000000000016901.json
2022-08-05 09:45:47      37200 00000000000000016902.json
2022-08-05 09:45:56     183626 00000000000000016903.json
2022-08-05 09:45:57   12651952 00000000000000016900.checkpoint.parquet
2022-08-05 09:45:59         32 _last_checkpoint
2022-08-05 09:46:03     172050 00000000000000016904.json
2022-08-05 09:46:16     166998 00000000000000016905.json
2022-08-05 09:46:25      58642 00000000000000016906.json
2022-08-05 09:46:34     506092 00000000000000016907.json

maor-sa avatar Aug 05 '22 14:08 maor-sa

Hi @allisonport-db from our analysis we experienced that the default 30-days interval are taken into account by Delta but changes in our Table properties in Glue metastore to do the delta_log cleaning earlier are not taken into account. We will provide the details.

MarcBertini avatar Aug 09 '22 16:08 MarcBertini

Hey @allisonport-db Apparently logs file are being deleted after 30 days, please see the attached listing.

2022-07-15 20:00:08   87528696 00000000000000005380.checkpoint.parquet
2022-07-15 19:58:08      37199 00000000000000005380.json
2022-07-15 19:58:34     124280 00000000000000005381.json
2022-07-15 19:59:02     254735 00000000000000005382.json
2022-07-15 19:59:30     230309 00000000000000005383.json
2022-07-15 19:59:56     112304 00000000000000005384.json
2022-07-15 20:00:24     149957 00000000000000005385.json
2022-07-15 20:00:52     145427 00000000000000005386.json
2022-07-15 20:01:19     122475 00000000000000005387.json
2022-07-15 20:02:12     465044 00000000000000005388.json
2022-07-15 20:36:12      47583 00000000000000005389.json
2022-07-15 20:39:50   87321827 00000000000000005390.checkpoint.parquet
2022-07-15 20:37:05      47801 00000000000000005390.json
2022-07-15 20:37:47      45122 00000000000000005391.json
2022-07-15 20:38:15      52126 00000000000000005392.json
2022-07-15 20:38:41      92072 00000000000000005393.json
2022-07-15 20:39:08      99140 00000000000000005394.json
2022-07-15 20:39:37      91946 00000000000000005395.json
2022-07-15 20:40:06      83005 00000000000000005396.json
2022-07-15 20:40:33      49017 00000000000000005397.json
2022-07-15 20:40:59      46727 00000000000000005398.json
2022-07-15 20:41:24        684 00000000000000005399.json
2022-07-15 20:44:31   88046580 00000000000000005400.checkpoint.parquet

...
2022-08-15 08:52:01    1212056 00000000000000016940.checkpoint.parquet
2022-08-15 08:51:55     162214 00000000000000016940.json
2022-08-15 08:52:37     109890 00000000000000016941.json
2022-08-15 08:52:43      53134 00000000000000016942.json
2022-08-15 08:53:15      80952 00000000000000016943.json
2022-08-15 08:53:20     224000 00000000000000016944.json
2022-08-15 08:53:30     322005 00000000000000016945.json
2022-08-15 08:53:35     563529 00000000000000016946.json
2022-08-15 08:53:40     456412 00000000000000016947.json
2022-08-15 08:53:45     124597 00000000000000016948.json
2022-08-15 08:53:50     137456 00000000000000016949.json
2022-08-15 08:54:01    1218797 00000000000000016950.checkpoint.parquet
2022-08-15 08:53:57     274748 00000000000000016950.json
2022-08-15 08:54:10     119825 00000000000000016951.json
2022-08-15 08:54:21     178450 00000000000000016952.json
2022-08-15 08:54:26     198690 00000000000000016953.json
2022-08-15 08:55:11     911431 00000000000000016954.json
2022-08-15 08:54:02         31 _last_checkpoint

maor-sa avatar Aug 15 '22 13:08 maor-sa

changes in our Table properties in Glue metastore

Not sure how you changed this. The value has to be stored in delta transaction logs. Storing it only in Glue won't work. The command to set this would be ALTER TABLE such as ALTER TABLE <table> SET TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 7 days').

Can you try to run DESCRIBE DETAIL on the table and check the value of the properties column (does it have delta.logRetentionDuration and is the value correct)? This is the way to verify whether your setting is working or not.

zsxwing avatar Aug 15 '22 14:08 zsxwing

Closing this as there is no update. Feel free to re-open if you can provide more information.

zsxwing avatar Sep 14 '22 17:09 zsxwing