hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Hudi delete not working via spark apis

Open rjmblc opened this issue 3 years ago • 15 comments

Describe the problem you faced I am trying to delete from hudi table using spark apis, but I am neither observing any exceptions nor the records are getting deleted. deltacommit getting generated succesfully for the delete request under .hoodie/ folder.

To Reproduce Steps to reproduce the behavior:

  1. Launch the pyspark shell from EMR pyspark --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

  2. Read the hudi table as a dataframe `df = spark.read.format('hudi').load('s3://${bucket_name}/customer_event_3')

    df.show() |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|customer_event_id|client_id| customer_id|event_id| event_date| created_date|updated_date|source_reference_id|total_amount|customer_event_status|created_by|updated_by| +-------------------+--------------------+------------------+----------------------+--------------------+-----------------+---------+------------+--------+-------------------+-------------------+------------+-------------------+------------+---------------------+----------+----------+ | 20220809121931258|20220809121931258...| 523312| 523312|00f5d98d-13b4-4e4...| 266| 523312|618026952022| 37|2022-06-14 08:10:00|2022-06-14 08:00:08| null| null| 30.00| null| null| null| | 20220809122023456|20220809122023456...| 523313| 523313|2660d6d9-999a-4b1...| 267| 523313|618026952023| 38|2022-06-14 08:10:00|2022-06-14 08:00:08| null| null| 30.00| null| null| null| +-------------------+--------------------+------------------+----------------------+--------------------+-----------------+---------+------------+--------+-------------------+-------------------+------------+-------------------+------------+---------------------+----------+----------+ `

  3. Filter records to be deleted as save a dataframe df1 = df.filter('client_id = 523312')

  4. Set delete config & submit delete request. `hudi_delete_options = { 'hoodie.table.name': "customer_event_3", 'hoodie.datasource.write.table.name': "customer_event_3", 'hoodie.datasource.write.recordkey.field': "client_id", 'hoodie.datasource.write.partitionpath.field': "client_id", 'hoodie.datasource.write.operation': "delete" }

df1.write.format("hudi").
options(**hudi_delete_options).
option('hoodie.datasource.write.operation', 'delete').
option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload').
mode("append").
save('s3://offline-store-qa/customer_event_3')5. Validate by reading the table as dataframe and displayingdf = spark.read.format('hudi').load('s3://${bucket_name}/customer_event_3') df.show()`

Expected behavior The record with client_id 523312 should be removed from the table.

Environment Description

  • Hudi version :0.10.1-amzn-0

  • Spark version :3.2.0

  • Hive version :3.1.2

  • Hadoop version :3.2.1

  • Storage (HDFS/S3/GCS..) :S3

  • Running on Docker? (yes/no) : No

Additional context

.hoodie/hoodie.properties file #Properties saved on Tue Aug 09 12:19:27 IST 2022 #Tue Aug 09 12:19:27 IST 2022 hoodie.table.partition.fields=client_id hoodie.table.type=MERGE_ON_READ hoodie.archivelog.folder=archived hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload hoodie.timeline.layout.version=1 hoodie.table.version=3 hoodie.table.recordkey.fields=client_id hoodie.datasource.write.partitionpath.urlencode=false hoodie.table.keygenerator.class= hoodie.table.name=customer_event_3 hoodie.datasource.write.hive_style_partitioning=false

Stacktrace

Its not throwing any error logs

rjmblc avatar Aug 09 '22 07:08 rjmblc

@yihua can you pls provide an update on this?

rjmblc avatar Aug 18 '22 16:08 rjmblc

@rjmblc some points which maybe cause this issue:

  1. do you provide the configs spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension and "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
  2. Can you not set this client_id field as the primary key and partition field at the same time?
  3. maybe 'hoodie.datasource.write.payload.class' doesn't need to be set.

YannByron avatar Aug 25 '22 15:08 YannByron

yes. likely the issue could be (3) from yann's comment above. if you are setting it as delete operation, you don't need to override the payload class. if you are explicitly setting it to EmptyPayload, then you don't need to set operation type as "delete".

also, can you confirm that your filtered df is actually not empty? instead of writing to hudi, did you do df.count to ensure there are valid records.

Can you also post the contents of .hoodie/.commit or .hoodie/.deltacommit file that got added to .hoodie dir when you triggered the delete operation.

nsivabalan avatar Aug 25 '22 23:08 nsivabalan

@rjmblc : any updates on this regard.

nsivabalan avatar Aug 30 '22 23:08 nsivabalan

@nsivabalan I will check and update today. Thanks

rjmblc avatar Aug 31 '22 15:08 rjmblc

thanks!

nsivabalan avatar Aug 31 '22 18:08 nsivabalan

@nsivabalan I added both these config "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" and removed the payload class I’m stuck with the following error message tried changing the hudi bundle version to 11. Any tips for further troubleshooting? Error Message: py4j.protocol.Py4JJavaError: An error occurred while calling o90.save. : org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'spark_catalog': org.apache.spark.sql.hudi.catalog.HoodieCatalog

rjmblc avatar Sep 01 '22 15:09 rjmblc

@nsivabalan I resolved the HoodieCatalog config issue but still facing the same issue where hard delete is not happening. I am attaching the deltacommit file below for your reference. { "partitionToWriteStats" : { }, "compacted" : false, "extraMetadata" : { "schema" : "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"customer_event_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"client_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"customer_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_date\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"created_date\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"updated_date\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"source_reference_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"total_amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}],\"default\":null},{\"name\":\"customer_event_status\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"created_by\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"updated_by\",\"type\":[\"null\",\"string\"],\"default\":null}]}" }, "operationType" : "DELETE", "totalRecordsDeleted" : 0, "totalLogRecordsCompacted" : 0, "totalLogFilesCompacted" : 0, "totalCompactedRecordsUpdated" : 0, "totalLogFilesSize" : 0, "totalScanTime" : 0, "totalCreateTime" : 0, "totalUpsertTime" : 0, "minAndMaxEventTime" : { "Optional.empty" : { "val" : null, "present" : false } }, "fileIdAndRelativePaths" : { }, "writePartitionPaths" : [ ] }

rjmblc avatar Sep 05 '22 11:09 rjmblc

I reused our quick start guide and could able to get it working.

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator


// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.write.table.type","MERGE_ON_READ").
  mode(Overwrite).
  save(basePath)

  // spark-shell
val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()


spark.sql("select distinct partitionpath, uuid, fare from hudi_trips_snapshot").show(false)

// picked two random UUIDs from previous output. 

val dfToDelete = spark.sql("select * from inputDf1 where uuid in ('151d3208-18d7-4b88-9e8a-4994f44bc1a9','3bbf759d-e5d7-43f0-a924-01d253b263d5') ")
dfToDelete.show()


dfToDelete.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY,"delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.write.table.type","MERGE_ON_READ").
  mode(Append).
  save(basePath)

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()


spark.sql("select distinct partitionpath, uuid, fare from hudi_trips_snapshot").show()


spark.sql("select distinct partitionpath, uuid, fare from hudi_trips_snapshot where uuid in ('151d3208-18d7-4b88-9e8a-4994f44bc1a9','3bbf759d-e5d7-43f0-a924-01d253b263d5')").show(false)
+-------------+----+----+
|partitionpath|uuid|fare|
+-------------+----+----+
+-------------+----+----+

nsivabalan avatar Sep 07 '22 00:09 nsivabalan

btw, as pointed out earlier, since I am setting the operation type to "delete",I am not setting any value for payload class. its not required (EmptyPayload).

nsivabalan avatar Sep 07 '22 00:09 nsivabalan

@rjmblc : any updates please.

nsivabalan avatar Sep 10 '22 17:09 nsivabalan

@nsivabalan Didn't get a chance to validate. I will do it today. Thanks

rjmblc avatar Sep 13 '22 13:09 rjmblc

sure.

nsivabalan avatar Sep 14 '22 22:09 nsivabalan

@rjmblc : do you have any updates on this please.

nsivabalan avatar Sep 19 '22 14:09 nsivabalan

@nsivabalan I tried with setting operation type to "delete" ,and not setting any value for payload class but still getting an empty commit and delete is not happening. Below are the script, hoodie properties and delta commit files. I cross check and updateDF is not empty. Here I am trying to delete the entire table records.

Delete record script: `updateDF = spark.read.format('org.apache.hudi').load('s3://offline-store-qa/customer_event_3/')

updateDF.show()

hudiOptions = {
    'hoodie.table.name': 'customer_event_3',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
    'hoodie.datasource.write.recordkey.field': 'client_id,event_date,customer_event_id',
    'hoodie.datasource.write.partitionpath.field': 'client_id',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.table': 'customer_event_3',
    'hoodie.datasource.hive_sync.partition_fields': 'client_id',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}

updateDF.write \
    .format('org.apache.hudi') \
    .option('hoodie.datasource.write.operation', 'delete') \
    .options(**hudiOptions) \
    .mode('append') \
    .save('s3://offline-store-qa/customer_event_3/')`

Deltacommit & hoodie properties file: Downloads.zip

rjmblc avatar Sep 21 '22 13:09 rjmblc

@rjmblc @nsivabalan @yihua UPDATE: found what was missing while I was sharing the details here. I use hive style partitioning on hudi table. The same option also needs to be set on delete options as well ('hoodie.datasource.write.hive_style_partitioning':'true'). That solved it for me.

I am facing the exact same issue on Hudi 0.9.0 as well with S3 as storage layer. One difference is that my table is of type COW and using hudi with pyspark on AWS Glue 2.0 (Spark 2.4.3).

However, all other behaviours are the same. There is no failure. An empty commit that does not delete in data. Tried both with and without write.payload.class.

I can share more details if it helps find root cause from earlier versions as well.

Thanks.

jdattani avatar Sep 23 '22 11:09 jdattani

UPDATE: Adding the spark-submit command which I am using to submit jobs to the EMR cluster spark-submit --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.hive.convertMetastoreParquet=false' --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar filename.py

@jdattani That option is set to false in my hoodie.properties files. @nsivabalan Adding few more details to my issue. I updated hoodie.datasource.write.operation': 'delete' but still unable to delete the records.

Is there a problem that the pyspark script is unable to get the partition details?

  1. The input hudi table is created by a flink streaming job (I have no control over it) and below is the source code for the DDL. 1.Flink_Input_Source_DDL.zip

  2. Pyspark script to delete the records 2.hudi_delete_pyspark_script.zip

  3. Hudi table properties file 3.hoodie_properties.zip

4.Empty delta commit file 4.delta_commit_file.zip

Thanks

rjmblc avatar Sep 28 '22 13:09 rjmblc

UPDATE:@nsivabalan I tried after table compaction of the MOR table but still getting the same issue.

rjmblc avatar Oct 11 '22 06:10 rjmblc

@nsivabalan Just referred the EMR documentation for hudi delete and looks like full table schema needs to be passed for delete. Can you pls let me know how to pass the full table schema? Deletion requires schema – When deleting, you must specify the record key, the partition key, and the pre-combine key fields. Other columns can be made null or empty, but the full schema is required. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-considerations.html

rjmblc avatar Oct 11 '22 15:10 rjmblc

sorry to have dropped the ball on this. what the instruction say is. to write to hudi, you might be doing something like

df.write.format("hudi").....save(path).

In this, df's schema is expected to be table's schema. just that record key, partition path and pre combine has to non null and rest of the fields can be null.

nsivabalan avatar Nov 11 '22 06:11 nsivabalan

but please do give it a try w/ later versions. I highly doubt delete requires entire table schema. I vaguely remember we relaxed that constraint later.

nsivabalan avatar Nov 11 '22 06:11 nsivabalan

@rjmblc Did you get a chance to try out with Hudi 0.12.2 version?

codope avatar Feb 01 '23 14:02 codope

@rjmblc : can we have some updates here. I don't see any activity for quite sometime. if you got it resolved, feel free to close it out. or let us know how we can help.

nsivabalan avatar Feb 06 '23 23:02 nsivabalan

I have the same issue. I am trying to delete records from the Hudi table, but it is not happening. I am using the following EMR configuration EMR -5.33.0 , Spark 2.4.7 and Hudi 0.7.0

neeruks avatar Apr 21 '23 19:04 neeruks

@neeruks @rjmblc Can you please try with later versions of Hudi. I see it is all working seemlesly even with both the settings.

from pyspark.sql.types import *
from pyspark.sql.functions import *
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import Row
from datetime import date

spark = SparkSession \
    .builder \
    .master("local[1]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.0") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

common_config={
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.precombine.field": "timestamp",
    "hoodie.datasource.write.partitionpath.field": "id",
    'hoodie.table.name': 'issue_8625',
    "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
}
# Define the schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("b", IntegerType(), True),
    StructField("c", StringType(), True),
    StructField("d", DateType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("f", StringType(), True)
])

df = spark.createDataFrame([
    Row(id=1, b=2, c='string1', d=date(2000, 1, 1), timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = "2000-01-01 12:00:00"),
    Row(id=2, b=3, c='string2', d=date(2000, 2, 1), timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = "2000-01-01 12:21:20"),
    Row(id=4, b=5, c='string3', d=date(2000, 3, 1), timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = "2000-01-03 12:00:00")
], schema)

path = "file:///tmp/issue_6341_5"
df.write.format("org.apache.hudi") \
    .options(**common_config) \
    .mode("append") \
    .save(path)

spark.read.format("hudi").load(path).show()

newdf = spark.createDataFrame([
    Row(id=1, b=16, c=None, d=None, timestamp=datetime.strptime("2000-01-01 12:35:00", '%Y-%m-%d %H:%M:%S'), f=None)
], schema)

newdf.write.format("org.apache.hudi") \
    .options(**common_config) \
    .option("hoodie.datasource.write.operation", "delete") \
    .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \
    .mode("append") \
    .save(path)

spark.read.format("hudi").load(path).show()

ad1happy2go avatar May 09 '23 16:05 ad1happy2go

@neeruks @rjmblc Are you getting similar issue with the later version of hudi. If not feel free to close the issue.

ad1happy2go avatar May 24 '23 16:05 ad1happy2go

@nsivabalan I added both these config "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" and removed the payload class I’m stuck with the following error message tried changing the hudi bundle version to 11. Any tips for further troubleshooting? Error Message: py4j.protocol.Py4JJavaError: An error occurred while calling o90.save. : org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'spark_catalog': org.apache.spark.sql.hudi.catalog.HoodieCatalog

@rjlmblc I'm facing a similar issue like the one you had of spark not being able to find HoodieCatalog class. You mentioned you fixed this issue, can you please provide details on how you did it?

pedro-gaspar avatar Oct 13 '23 20:10 pedro-gaspar

Did you confirmed these

https://github.com/apache/hudi/issues/6341#issuecomment-1227436935

ad1happy2go avatar Oct 16 '23 08:10 ad1happy2go