[SUPPORT] Hudi delete not working via spark apis
Describe the problem you faced
I am trying to delete from hudi table using spark apis, but I am neither observing any exceptions nor the records are getting deleted. deltacommit getting generated succesfully for the delete request under .hoodie/ folder.
To Reproduce Steps to reproduce the behavior:
-
Launch the pyspark shell from EMR
pyspark --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar -
Read the hudi table as a dataframe `df = spark.read.format('hudi').load('s3://${bucket_name}/customer_event_3')
df.show() |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|customer_event_id|client_id| customer_id|event_id| event_date| created_date|updated_date|source_reference_id|total_amount|customer_event_status|created_by|updated_by| +-------------------+--------------------+------------------+----------------------+--------------------+-----------------+---------+------------+--------+-------------------+-------------------+------------+-------------------+------------+---------------------+----------+----------+ | 20220809121931258|20220809121931258...| 523312| 523312|00f5d98d-13b4-4e4...| 266| 523312|618026952022| 37|2022-06-14 08:10:00|2022-06-14 08:00:08| null| null| 30.00| null| null| null| | 20220809122023456|20220809122023456...| 523313| 523313|2660d6d9-999a-4b1...| 267| 523313|618026952023| 38|2022-06-14 08:10:00|2022-06-14 08:00:08| null| null| 30.00| null| null| null| +-------------------+--------------------+------------------+----------------------+--------------------+-----------------+---------+------------+--------+-------------------+-------------------+------------+-------------------+------------+---------------------+----------+----------+ `
-
Filter records to be deleted as save a dataframe
df1 = df.filter('client_id = 523312') -
Set delete config & submit delete request. `hudi_delete_options = { 'hoodie.table.name': "customer_event_3", 'hoodie.datasource.write.table.name': "customer_event_3", 'hoodie.datasource.write.recordkey.field': "client_id", 'hoodie.datasource.write.partitionpath.field': "client_id", 'hoodie.datasource.write.operation': "delete" }
df1.write.format("hudi").
options(**hudi_delete_options).
option('hoodie.datasource.write.operation', 'delete').
option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload').
mode("append").
save('s3://offline-store-qa/customer_event_3')5. Validate by reading the table as dataframe and displayingdf = spark.read.format('hudi').load('s3://${bucket_name}/customer_event_3')
df.show()`
Expected behavior The record with client_id 523312 should be removed from the table.
Environment Description
-
Hudi version :0.10.1-amzn-0
-
Spark version :3.2.0
-
Hive version :3.1.2
-
Hadoop version :3.2.1
-
Storage (HDFS/S3/GCS..) :S3
-
Running on Docker? (yes/no) : No
Additional context
.hoodie/hoodie.properties file
#Properties saved on Tue Aug 09 12:19:27 IST 2022
#Tue Aug 09 12:19:27 IST 2022
hoodie.table.partition.fields=client_id
hoodie.table.type=MERGE_ON_READ
hoodie.archivelog.folder=archived
hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
hoodie.timeline.layout.version=1
hoodie.table.version=3
hoodie.table.recordkey.fields=client_id
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.keygenerator.class=
hoodie.table.name=customer_event_3
hoodie.datasource.write.hive_style_partitioning=false
Stacktrace
Its not throwing any error logs
@yihua can you pls provide an update on this?
@rjmblc some points which maybe cause this issue:
- do you provide the configs
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtensionand"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog - Can you not set this
client_idfield as the primary key and partition field at the same time? - maybe 'hoodie.datasource.write.payload.class' doesn't need to be set.
yes. likely the issue could be (3) from yann's comment above. if you are setting it as delete operation, you don't need to override the payload class. if you are explicitly setting it to EmptyPayload, then you don't need to set operation type as "delete".
also, can you confirm that your filtered df is actually not empty? instead of writing to hudi, did you do df.count to ensure there are valid records.
Can you also post the contents of .hoodie/.commit or .hoodie/.deltacommit file that got added to .hoodie dir when you triggered the delete operation.
@rjmblc : any updates on this regard.
@nsivabalan I will check and update today. Thanks
thanks!
@nsivabalan I added both these config "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" and removed the payload class
I’m stuck with the following error message tried changing the hudi bundle version to 11. Any tips for further troubleshooting?
Error Message:
py4j.protocol.Py4JJavaError: An error occurred while calling o90.save. : org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'spark_catalog': org.apache.spark.sql.hudi.catalog.HoodieCatalog
@nsivabalan I resolved the HoodieCatalog config issue but still facing the same issue where hard delete is not happening. I am attaching the deltacommit file below for your reference.
{ "partitionToWriteStats" : { }, "compacted" : false, "extraMetadata" : { "schema" : "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"customer_event_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"client_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"customer_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_date\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"created_date\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"updated_date\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"source_reference_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"total_amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}],\"default\":null},{\"name\":\"customer_event_status\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"created_by\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"updated_by\",\"type\":[\"null\",\"string\"],\"default\":null}]}" }, "operationType" : "DELETE", "totalRecordsDeleted" : 0, "totalLogRecordsCompacted" : 0, "totalLogFilesCompacted" : 0, "totalCompactedRecordsUpdated" : 0, "totalLogFilesSize" : 0, "totalScanTime" : 0, "totalCreateTime" : 0, "totalUpsertTime" : 0, "minAndMaxEventTime" : { "Optional.empty" : { "val" : null, "present" : false } }, "fileIdAndRelativePaths" : { }, "writePartitionPaths" : [ ] }
I reused our quick start guide and could able to get it working.
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
mode(Overwrite).
save(basePath)
// spark-shell
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
spark.sql("select distinct partitionpath, uuid, fare from hudi_trips_snapshot").show(false)
// picked two random UUIDs from previous output.
val dfToDelete = spark.sql("select * from inputDf1 where uuid in ('151d3208-18d7-4b88-9e8a-4994f44bc1a9','3bbf759d-e5d7-43f0-a924-01d253b263d5') ")
dfToDelete.show()
dfToDelete.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
mode(Append).
save(basePath)
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
spark.sql("select distinct partitionpath, uuid, fare from hudi_trips_snapshot").show()
spark.sql("select distinct partitionpath, uuid, fare from hudi_trips_snapshot where uuid in ('151d3208-18d7-4b88-9e8a-4994f44bc1a9','3bbf759d-e5d7-43f0-a924-01d253b263d5')").show(false)
+-------------+----+----+
|partitionpath|uuid|fare|
+-------------+----+----+
+-------------+----+----+
btw, as pointed out earlier, since I am setting the operation type to "delete",I am not setting any value for payload class. its not required (EmptyPayload).
@rjmblc : any updates please.
@nsivabalan Didn't get a chance to validate. I will do it today. Thanks
sure.
@rjmblc : do you have any updates on this please.
@nsivabalan I tried with setting operation type to "delete" ,and not setting any value for payload class but still getting an empty commit and delete is not happening. Below are the script, hoodie properties and delta commit files. I cross check and updateDF is not empty. Here I am trying to delete the entire table records.
Delete record script: `updateDF = spark.read.format('org.apache.hudi').load('s3://offline-store-qa/customer_event_3/')
updateDF.show()
hudiOptions = {
'hoodie.table.name': 'customer_event_3',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'hoodie.datasource.write.recordkey.field': 'client_id,event_date,customer_event_id',
'hoodie.datasource.write.partitionpath.field': 'client_id',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'customer_event_3',
'hoodie.datasource.hive_sync.partition_fields': 'client_id',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}
updateDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'delete') \
.options(**hudiOptions) \
.mode('append') \
.save('s3://offline-store-qa/customer_event_3/')`
Deltacommit & hoodie properties file: Downloads.zip
@rjmblc @nsivabalan @yihua UPDATE: found what was missing while I was sharing the details here. I use hive style partitioning on hudi table. The same option also needs to be set on delete options as well ('hoodie.datasource.write.hive_style_partitioning':'true'). That solved it for me.
I am facing the exact same issue on Hudi 0.9.0 as well with S3 as storage layer. One difference is that my table is of type COW and using hudi with pyspark on AWS Glue 2.0 (Spark 2.4.3).
However, all other behaviours are the same. There is no failure. An empty commit that does not delete in data. Tried both with and without write.payload.class.
I can share more details if it helps find root cause from earlier versions as well.
Thanks.
UPDATE: Adding the spark-submit command which I am using to submit jobs to the EMR cluster spark-submit --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.hive.convertMetastoreParquet=false' --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar filename.py
@jdattani That option is set to false in my hoodie.properties files. @nsivabalan Adding few more details to my issue. I updated hoodie.datasource.write.operation': 'delete' but still unable to delete the records.
Is there a problem that the pyspark script is unable to get the partition details?
-
The input hudi table is created by a flink streaming job (I have no control over it) and below is the source code for the DDL. 1.Flink_Input_Source_DDL.zip
-
Pyspark script to delete the records 2.hudi_delete_pyspark_script.zip
-
Hudi table properties file 3.hoodie_properties.zip
4.Empty delta commit file 4.delta_commit_file.zip
Thanks
UPDATE:@nsivabalan I tried after table compaction of the MOR table but still getting the same issue.
@nsivabalan Just referred the EMR documentation for hudi delete and looks like full table schema needs to be passed for delete. Can you pls let me know how to pass the full table schema? Deletion requires schema – When deleting, you must specify the record key, the partition key, and the pre-combine key fields. Other columns can be made null or empty, but the full schema is required. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-considerations.html
sorry to have dropped the ball on this. what the instruction say is. to write to hudi, you might be doing something like
df.write.format("hudi").....save(path).
In this, df's schema is expected to be table's schema. just that record key, partition path and pre combine has to non null and rest of the fields can be null.
but please do give it a try w/ later versions. I highly doubt delete requires entire table schema. I vaguely remember we relaxed that constraint later.
@rjmblc Did you get a chance to try out with Hudi 0.12.2 version?
@rjmblc : can we have some updates here. I don't see any activity for quite sometime. if you got it resolved, feel free to close it out. or let us know how we can help.
I have the same issue. I am trying to delete records from the Hudi table, but it is not happening. I am using the following EMR configuration EMR -5.33.0 , Spark 2.4.7 and Hudi 0.7.0
@neeruks @rjmblc Can you please try with later versions of Hudi. I see it is all working seemlesly even with both the settings.
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import Row
from datetime import date
spark = SparkSession \
.builder \
.master("local[1]") \
.config("spark.driver.memory", "8g") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.0") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
common_config={
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.precombine.field": "timestamp",
"hoodie.datasource.write.partitionpath.field": "id",
'hoodie.table.name': 'issue_8625',
"hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
}
# Define the schema
schema = StructType([
StructField("id", IntegerType(), True),
StructField("b", IntegerType(), True),
StructField("c", StringType(), True),
StructField("d", DateType(), True),
StructField("timestamp", TimestampType(), True),
StructField("f", StringType(), True)
])
df = spark.createDataFrame([
Row(id=1, b=2, c='string1', d=date(2000, 1, 1), timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = "2000-01-01 12:00:00"),
Row(id=2, b=3, c='string2', d=date(2000, 2, 1), timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = "2000-01-01 12:21:20"),
Row(id=4, b=5, c='string3', d=date(2000, 3, 1), timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = "2000-01-03 12:00:00")
], schema)
path = "file:///tmp/issue_6341_5"
df.write.format("org.apache.hudi") \
.options(**common_config) \
.mode("append") \
.save(path)
spark.read.format("hudi").load(path).show()
newdf = spark.createDataFrame([
Row(id=1, b=16, c=None, d=None, timestamp=datetime.strptime("2000-01-01 12:35:00", '%Y-%m-%d %H:%M:%S'), f=None)
], schema)
newdf.write.format("org.apache.hudi") \
.options(**common_config) \
.option("hoodie.datasource.write.operation", "delete") \
.option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \
.mode("append") \
.save(path)
spark.read.format("hudi").load(path).show()
@neeruks @rjmblc Are you getting similar issue with the later version of hudi. If not feel free to close the issue.
@nsivabalan I added both these config
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"and removed the payload class I’m stuck with the following error message tried changing the hudi bundle version to 11. Any tips for further troubleshooting? Error Message:py4j.protocol.Py4JJavaError: An error occurred while calling o90.save. : org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'spark_catalog': org.apache.spark.sql.hudi.catalog.HoodieCatalog
@rjlmblc I'm facing a similar issue like the one you had of spark not being able to find HoodieCatalog class. You mentioned you fixed this issue, can you please provide details on how you did it?
Did you confirmed these
https://github.com/apache/hudi/issues/6341#issuecomment-1227436935