delta
delta copied to clipboard
[Question] Record loss when using Delta format
Hi team,
We have a CDC pipeline which is used to update records in AWS S3. Multiple collections are processed in parallel using Python + Spark. Below are the version details:
Apache Spark: 3.0.2 Python: 3.6.9 Java: 1.8.0_312 Delta: 0.8.0
While performing data sanity checks we noticed that couple of records were missing in the table, though they were available on the previous day. We were able to access the same records using time travel option "timestampAsOf". The table in question does not have any delete operations. We only perform insert or merge at any given time. Below is the sample code for merge been used in this case.
partition_list = ["year", "month"]
DeltaTable
.forPath(sc, delta_path)
.alias("t")
.merge(data.alias("s"), "((t.year = 2020 AND t.month = 1) OR (t.year = 2020 AND t.month = 2)) AND t.pkey_id=s.pkey_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
Properties set at spark level:
spark.databricks.delta.schema.autoMerge.enabled: 'true'
spark.delta.merge.repartitionBeforeWrite: 'true'
delta.compatibility.symlinkFormatManifest.enabled: 'true'
spark.databricks.delta.logRetentionDuration: '10 days'
Can you guide us how we can find the root cause of the issue as no exceptions are thrown at this stage.
Note: We have confirmed that this happens while writing to the Delta table, as this records are been pushed to kafka and postgres using the same dataframe at a later stage.
Is it possible the missing records in question were the records that matched your merge condition and were updated?
Otherwise it's difficult to track this down without a repro, is it possible you could share one?
This may be related to https://github.com/delta-io/delta/issues/527