hudi
hudi copied to clipboard
[SUPPORT] repair deduplicate unable to find `_hoodie_record_key` in data
Problem
In hudi-cli I’m trying to run repair deduplicate against a partition in which I have confirmed via a separate spark query that there are in fact duplicates on the _hoodie_record_key.
I'm getting
cannot resolve '_hoodie_record_key' given input columns: []
To Reproduce
- Verify duplicates exist with separate spark query:
val basePath="s3://thebucket/data/tables/events"
val partitionPath="env_id=123/week=20220711"
val inputPath=s"$basePath/$partitionPath"
val df = spark.read.load(inputPath)
df.printSchema() // shows expected schema including `_hoodie_record_key`
df.createOrReplaceTempView("hoh")
val hoodieKeyQuery = "select _hoodie_record_key, count(*) from hoh group by _hoodie_record_key having count(*) > 1"
val dupes = spark.sql(hoodieKeyQuery)
dupes.count() // about 1000 dupes counted
- hudi-cli repair attempt: connect --path s3://thebucket/data/tables/events repair deduplicate --duplicatedPartitionPath "env_id=123/week=20220711" --repairedOutputPath hhdeduplicates --sparkMaster local[2] --sparkMemory 4G --dryrun true --dedupeType "upsert_type" outputs (stack trace at end of doc):
cannot resolve '_hoodie_record_key' given input columns: []
Expected behavior The dryrun should produce some information about files that will be fixed
Environment Description
- Hudi version : 0.10.1
- Hudi cli version: hudi-cli-0.10.1-amzn-0.jar
- Spark version : 3.2.0
- Storage: S3
- EMR: emr-6.6.0
- Hadoop distribution:Amazon 3.2.1
Additional context We have both a streaming ingest job and a backfill job writing to this hudi table. Here are the write options for each job: Stream ingest write options:
HoodieWriteConfig.TBL_NAME.key -> "events",
DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "env_id,user_id,event_id",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "env_id,week",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "time",
DataSourceWriteOptions.OPERATION.key -> INSERT_OPERATION_OPT_VAL,
HIVE_STYLE_PARTITIONING.key() -> "true",
"hoodie.insert.shuffle.parallelism" -> ingestConfig.hudiInsertParallelism,
"checkpointLocation" -> ingestConfig.hudiCheckpointPath,
"hoodie.metadata.enable" -> "true",
"hoodie.bloom.index.filter.dynamic.max.entries" -> "800000",
"hoodie.datasource.write.streaming.ignore.failed.batch" -> "false",
"hoodie.write.concurrency.mode" -> "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes" -> "LAZY",
"hoodie.write.lock.provider" -> "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
"hoodie.write.lock.dynamodb.table" -> "datalake-locks",
"hoodie.write.lock.dynamodb.partition_key" -> "events",
"hoodie.write.lock.dynamodb.region" -> "us-east-1"
Backfill job write options:
HoodieWriteConfig.TBL_NAME.key -> "events",
DataSourceWriteOptions.TABLE_TYPE.key -> COW_TABLE_TYPE_OPT_VAL, // note: this seems to be ignored: the table was created with MOR before the backfill began.
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "env_id,user_id,event_id",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "env_id,week",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "time",
DataSourceWriteOptions.OPERATION.key -> BULK_INSERT_OPERATION_OPT_VAL,
HIVE_STYLE_PARTITIONING.key() -> "true",
"hoodie.insert.shuffle.parallelism" -> hudiInsertParallelism,
"hoodie.upsert.shuffle.parallelism" -> hudiBulkInsertParallelism,
"hoodie.metadata.enable" -> "true",
"hoodie.bloom.index.filter.dynamic.max.entries" -> "800000",
"hoodie.datasource.write.streaming.ignore.failed.batch" -> "false",
"hoodie.bulkinsert.sort.mode" -> "NONE",
"hoodie.combine.before.insert" -> "false",
"hoodie.datasource.write.row.writer.enable" -> "false",
"hoodie.write.concurrency.mode" -> "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes" -> "LAZY",
"hoodie.write.lock.provider" -> "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
"hoodie.write.lock.dynamodb.table" -> "datalake-locks",
"hoodie.write.lock.dynamodb.partition_key" -> "events",
"hoodie.write.lock.dynamodb.region" -> "us-east-1"
Stacktrace
22/07/22 19:18:58 ERROR SparkMain: Fail to execute commandString
org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' given input columns: []; line 5 pos 15;
'UnresolvedHaving ('dupe_cnt > 1)
+- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0, count(1) AS dupe_cnt#1L]
+- SubqueryAlias htbl_1658517533303
+- View (htbl_1658517533303, [])
+- LocalRelation <empty>
make spark.sql.parquet.mergeSchema to true I think can solve this error. but may be there has diff schema data file in this partition? I think we need to check it.
@KnightChess I added spark.sql.parquet.mergeSchema true to the spark properties file, then reconnected to the hudi-cli and re-tried the repair command again. The result was the same:
org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' given input columns: []
I am pretty confident that there is no schema diff in the data files.
@KnightChess I added
spark.sql.parquet.mergeSchema trueto the spark properties file, then reconnected to the hudi-cli and re-tried the repair command again. The result was the same:org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' given input columns: []
I am pretty confident that there is no schema diff in the data files.
Can you verify if the duplicates are in the same file? hoodie_file column has the same value or different value
@ehurheap
22/07/22 19:18:58 ERROR SparkMain: Fail to execute commandString
org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' given input columns: []; line 5 pos 15;
'UnresolvedHaving ('dupe_cnt > 1)
+- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0, count(1) AS dupe_cnt#1L]
+- SubqueryAlias htbl_1658517533303
+- View (htbl_1658517533303, [])
+- LocalRelation <empty>
+- LocalRelation <empty>, it look like the path you input not have complete files, can you give some detail log in org.apache.hudi.cli.DedupeSparkJob
the info log is: List of files under partition: xxx => yyyy
Confirming that the list of files is empty:
List of files under partition: () =>
I did some debugging and this list seems to be empty because the timeline in each of the relevant HoodieFileGroups is empty:
From HoodieFileGroup.java:
public Stream<FileSlice> getAllFileSlices() {
if (!timeline.empty()) {
return fileSlices.values().stream().filter(this::isFileSliceCommitted);
}
return Stream.empty();
}
Is there some assumption or rule that repair deduplicate can only be run for certain conditions? For example, before compaction has happened?
@ehurheap look like repair deduplicate target is base file. Is there any duplicate data when you use query or other op? For log file, I think it will combine the duplicate primary key when you read, so need not repair.
these set of commands worked for me. I followed our quick start guide, just by setting write operation to "bulk_insert". I repeated the same insert Df twice to hudi table.
spark.sql("select partitionpath, count(uuid) from hudi_trips_snapshot group by 1 order by 1").show(false)
+------------------------------------+-----------+
|partitionpath |count(uuid)|
+------------------------------------+-----------+
|americas/brazil/sao_paulo |6 |
|americas/united_states/san_francisco|10 |
|asia/india/chennai |4 |
+------------------------------------+-----------+
spark.sql("select partitionpath, uuid, fare from hudi_trips_snapshot order by 1,2").show(false)
+------------------------------------+------------------------------------+------------------+
|partitionpath |uuid |fare |
+------------------------------------+------------------------------------+------------------+
|americas/brazil/sao_paulo |69f49197-cc8e-4d91-b745-dbe6e83a4b5a|66.62084366450246 |
|americas/brazil/sao_paulo |69f49197-cc8e-4d91-b745-dbe6e83a4b5a|66.62084366450246 |
|americas/brazil/sao_paulo |be747659-1a1d-4445-b5ae-ccfe5104e69e|43.4923811219014 |
|americas/brazil/sao_paulo |be747659-1a1d-4445-b5ae-ccfe5104e69e|43.4923811219014 |
|americas/brazil/sao_paulo |ebe7576f-18c6-4cb8-b119-94ac7bb518c2|34.158284716382845|
|americas/brazil/sao_paulo |ebe7576f-18c6-4cb8-b119-94ac7bb518c2|34.158284716382845|
|americas/united_states/san_francisco|07ce908b-1f5e-405f-98fb-37796264d7c3|19.179139106643607|
|americas/united_states/san_francisco|07ce908b-1f5e-405f-98fb-37796264d7c3|19.179139106643607|
|americas/united_states/san_francisco|09e8d4d3-f6ce-425a-8b8a-1858d9aea981|64.27696295884016 |
|americas/united_states/san_francisco|09e8d4d3-f6ce-425a-8b8a-1858d9aea981|64.27696295884016 |
|americas/united_states/san_francisco|15b605fb-4ce7-4f37-a1ec-79989ae9b90f|33.92216483948643 |
|americas/united_states/san_francisco|15b605fb-4ce7-4f37-a1ec-79989ae9b90f|33.92216483948643 |
|americas/united_states/san_francisco|ac572297-8aa7-4cfc-be1b-280ab8b6c783|27.79478688582596 |
|americas/united_states/san_francisco|ac572297-8aa7-4cfc-be1b-280ab8b6c783|27.79478688582596 |
|americas/united_states/san_francisco|de8d8395-f3aa-412a-bb49-78f4e2537677|93.56018115236618 |
|americas/united_states/san_francisco|de8d8395-f3aa-412a-bb49-78f4e2537677|93.56018115236618 |
|asia/india/chennai |047c2afe-3cb1-4976-badb-10042a33e9e9|41.06290929046368 |
|asia/india/chennai |047c2afe-3cb1-4976-badb-10042a33e9e9|41.06290929046368 |
|asia/india/chennai |f74ac5dc-a908-4669-b607-8a382ffbf103|17.851135255091155|
|asia/india/chennai |f74ac5dc-a908-4669-b607-8a382ffbf103|17.851135255091155|
+------------------------------------+------------------------------------+------------------+
de-dup via hudi-cli
Launch hudi-cli
connect --path /tmp/hudi_trips_cow/
set --conf SPARK_HOME=/Users/nsb/Documents/tools/spark-2.4.7-bin-hadoop2.7
repair deduplicate --duplicatedPartitionPath "americas/brazil/sao_paulo" --repairedOutputPath "/tmp/dedupedData/" --sparkMemory 1g --sparkMaster local[2] --dedupeType "upsert_type"
After this, I see some parquet files in "/tmp/dedupedData/". I tried reading them via spark-shell to check for duplicates.
val df = spark.read.format("parquet").load("/tmp/dedupedData/")
df.registerTempTable("tbl1")
spark.sql("select partitionpath, uuid, fare from tbl1 order by 1,2").show(false)
+-------------------------+------------------------------------+------------------+
|partitionpath |uuid |fare |
+-------------------------+------------------------------------+------------------+
|americas/brazil/sao_paulo|69f49197-cc8e-4d91-b745-dbe6e83a4b5a|66.62084366450246 |
|americas/brazil/sao_paulo|be747659-1a1d-4445-b5ae-ccfe5104e69e|43.4923811219014 |
|americas/brazil/sao_paulo|ebe7576f-18c6-4cb8-b119-94ac7bb518c2|34.158284716382845|
+-------------------------+------------------------------------+------------------+
Let me know if this helps.
@KnightChess , yes there is duplicate data when running a query on the data:
val path="s3://<bucketpath>/tables/events"
val events = spark.read.format("hudi").option("hoodie.datasource.query.type", "read_optimized").load(path)
events.createOrReplaceTempView("events")
val dupeQuery =
"""select env_id, event_id, user_id, count(*) from events
| where env_id = 123 and week = '20220711'
| group by env_id, event_id, user_id
| having count(*) > 1
|""".stripMargin
val res = spark.sql(dupeQuery)
res: org.apache.spark.sql.DataFrame = [env_id: bigint, event_id: bigint ... 2 more fields]
scala> res.show
+---------+----------------+----------------+--------+
| env_id| event_id| user_id|count(1)|
+---------+----------------+----------------+--------+
| 123|4401289435098557|3813718218593807| 2|
| 123|7627782625576713|4299498150167280| 2|
| 123|7972131523381176|4461192992664821| 2|
...
+---------+----------------+----------------+--------+
only showing top 20 rows
sorry. I am bit confused. If I am not wrong, this is your issue. you had duplicates in your hudi table somehow. you tried to execute hudi-cli to dedup and ran into issues and posted w/ the stacktrace.
I gave you the commands I used and showed that it worked for me.
But I could not gauge your response to that. repair dedup command does not fix duplicates in the table. It dumps the deduped records to a separate location as parquet data. you may need to delete the matching entries from hudi and load the parquet data again. I understand its not easy. but thats the only option we have.
let me know how we can help further.
sorry for the confusion @nsivabalan. I reviewed the commands that you specified to verify they were the same as what I tried. The main differences between what you did and our situation:
- Our hudi table was loaded by 2 separate processes, one bulk_insert, one streaming ingest job
- In addition our hudi table is MOR.
I can run a spark query similar to yours and verify there are duplicates in the given partition.
val dupePath="s3://<bucket>/data/tables/events/env_id=123456789/week=20220711"
val ddF = spark.read.parquet(dupePath)
ddF.createOrReplaceTempView("hoh")
val dupQ = s"""
|select _hoodie_partition_path, _hoodie_record_key, count(*)
| from hoh group by _hoodie_partition_path, _hoodie_record_key
| order by count(*) desc
|""".stripMargin
spark.sql(dupQ).show(false)
sample output:
+------------------------------+-------------------------------------------------------------------+--------+
|_hoodie_partition_path |_hoodie_record_key |count(1)|
+------------------------------+-------------------------------------------------------------------+--------+
|env_id=123456789/week=20220711|env_id:123456789,user_id:7806358957060773,event_id:5758152328327473|3 |
|env_id=123456789/week=20220711|env_id:123456789,user_id:5403332495352077,event_id:3536309858058402|2 |
|env_id=123456789/week=20220711|env_id:123456789,user_id:4713648045477470,event_id:8717656941318904|2 |
|env_id=123456789/week=20220711|env_id:123456789,user_id:2025910439767252,event_id:8549159234261693|2 |
|env_id=123456789/week=20220711|env_id:123456789,user_id:7507696929673571,event_id:3179806702204642|2 |
|env_id=123456789/week=20220711|env_id:123456789,user_id:7301684312119961,event_id:717438862368076 |2 |
But when I run this hudi-cli command:
hudi:events->repair deduplicate --duplicatedPartitionPath "env_id=123/week=20220711" --repairedOutputPath /tmp/hhdeduplicates --sparkMaster local[2] --sparkMemory 4G --dryrun true --dedupeType "upsert_type"
The output I get is:
22/08/12 16:27:21 ERROR SparkMain: Fail to execute commandString
org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' given input columns: []; line 5 pos 15;
'UnresolvedHaving ('dupe_cnt > 1)
+- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0, count(1) AS dupe_cnt#1L]
+- SubqueryAlias htbl_1660321638341
+- View (`htbl_1660321638341`, [])
+- LocalRelation <empty>
hudi:events->
hudi:eveat org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
hudi:eveat org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179)
hudi:eveat org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175)
hudi:eveat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:573)
hudi:eveat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
hudi:eveat org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:573)
hudi:eveat org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181)
hudi:eveat org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193)
hudi:eveat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
hudi:eveat org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193)
hudi:eveat org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204)
hudi:eveat org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:209)
hudi:eveat scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
...
Deduplication failed!
There are no files in the --repairedOutputPath location. I understand that if there were data there, we would use that deduplicated data to replace what is currently in hudi: first deleting the duplicates from hudi, then load the deduped data from the repairedOutputPath location.
But since we have no repaired deduplicated data, we are stuck. Does that make sense? Have I missed something ?
got it. isn't your partition path is --duplicatedPartitionPath "env_id= 123456789/week=20220711". I see you have tried "env_id=123/week=20220711"
123 -> 123456789.
bcoz, I do't see any other difference. only other diff is, I did not enable hive style partitioning in my script. I can give that a retry again.
ok, I guess the issue is w/ MOR table. dedup as of now is supported only for COW table. let me file a jira.
https://github.com/apache/hudi/issues/6194 you can follow the jira for progress. we will try to get it in next release. thanks for reporting. if you don't have any more questions, feel free to close out the github issue. we can use the jira to follow up.
Thanks @nsivabalan. (I must have flubbed the partition path in my copy/pasting - sorry about that.) I'll close this ticket and follow the jira.
Reopening: @nsivabalan what is the link for the jira? The link posted above is for this ticket.
my bad. Its https://issues.apache.org/jira/browse/HUDI-4752