hudi
hudi copied to clipboard
[SUPPORT] Merge Into MOR table works incorrectly
Describe the problem you faced
On COW table everything is fine, but on MOR (bucket index): MERGE INTO executes action from "when matched" clause even on non-existent records.
To Reproduce
- create MOR target table (simple bucket index with 1 bucket)
- insert some records
- delete one record
- merge into target table some records (one that was previously deleted, one that exists in target, one that does not exist in target) with
update when matchedandinsert when not matched.
Expected behavior
Record that was previously deleted should be inserted as is, record that existed in target should be updated, record that did not exist in target should be added, records that existed only in target should stay without changes.
Environment Description
-
Hudi version : current master
-
Spark version : 3.5
-
Hive version :
-
Hadoop version :
-
Storage (HDFS/S3/GCS..) :
-
Running on Docker? (yes/no) :
Additional context
You can add this case to TestMergeIntoTable and run locally:
test("Test Merge Into with record that was previously deleted") {
Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
// create and fill out source table
val tbParquet = generateTableName
spark.sql(
s"""
|create table $tbParquet (
| id int, comb int, col0 int, col1 string
|) using parquet
|location '${tmp.getCanonicalPath}/$tbParquet'
|""".stripMargin)
spark.sql(
s"""
|insert into $tbParquet values
|(3,30,130,'aa3'),
|(5,50,150,'aa5'),
|(6,60,160,'aa6')
|""".stripMargin)
// create target table
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int, comb int, col0 int, col1 string
| ) using hudi
| options(
| type='$tableType',
| primaryKey='id',
| preCombineField='comb',
| 'hoodie.index.type'='BUCKET',
| 'hoodie.bucket.index.num.buckets'='5'
| )
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
// fill out target table from source
spark.sql(s"insert into $tableName select * from $tbParquet")
checkAnswer(s"select id, col0, col1 from $tableName order by id")(
Seq(3, 130, "aa3"), Seq(5, 150, "aa5"), Seq(6, 160, "aa6")
)
// delete one record from target
spark.sql(s"delete from $tableName where id = 3")
// make sure there is no record with id=3 in target table anymore
checkAnswer(s"select id, col0, col1 from $tableName order by id")(
Seq(5, 150, "aa5"), Seq(6, 160, "aa6")
)
// insert one more record to source
spark.sql(s"insert into $tbParquet values (7,70,170,'aa7')")
// merge into target from source
spark.sql(
s"""
|merge into $tableName t1 using $tbParquet t2 on t1.id = t2.id
| when matched then update set id = t2.id, comb=t2.comb, col0=t2.col0+1, col1='oo'
| when not matched then insert *
|""".stripMargin)
spark.sql(s"select * from $tableName order by id").show()
// id=3 and id=7 should be inserted as is, id=5 and id=6 should be updated
checkAnswer(s"select id, col0, col1 from $tableName order by id")(
Seq(3, 130, "aa3"), Seq(5, 151, "oo"), Seq(6, 161, "oo"), Seq(7, 170, "aa7")
)
}
}
}
Stacktrace
Expected Array([3,130,aa3], [5,151,oo], [6,161,oo], [7,170,aa7]), but got Array([3,131,oo], [5,151,oo], [6,161,oo], [7,170,aa7])
I'll fix it by myself. Created Jira https://issues.apache.org/jira/browse/HUDI-9342
If create target table with only one bucket ('hoodie.bucket.index.num.buckets'='1') then this merge into results in:
[3,131,oo], [5,151,oo], [6,161,oo], [7,171,oo]
So the issue is not about only deleted records, but about matched/not matched mechanism itself. Need to check this case with other index types.
So here are results for this test with different types of tables and indexes:
correct:
COW any idx: [3,130,aa3], [5,151,oo], [6,161,oo], [7,170,aa7]
MOR INMEMORY: [3,130,aa3], [5,151,oo], [6,161,oo], [7,170,aa7]
incorrect:
MOR BUCKET 1: [3,131,oo], [5,151,oo], [6,161,oo], [7,171,oo]
MOR BUCKET 5: [3,131,oo], [5,151,oo], [6,161,oo], [7,170,aa7]
MOR SIMPLE: [3,131,oo], [5,151,oo], [6,161,oo], [7,170,aa7]
MOR BLOOM: [3,131,oo], [5,151,oo], [6,161,oo], [7,170,aa7]
MOR GLOBAL_BLOOM: [3,131,oo], [5,151,oo], [6,161,oo], [7,170,aa7]
MOR GLOBAL_SIMPLE: [3,131,oo], [5,151,oo], [6,161,oo], [7,170,aa7]
As i understand it right: MERGE INTO logic decides about update/insert of every record by it's currentLocation, so, if HoodieIndex has somehow calculated currentLocation for a record, this record would go through "when matched" execution branch, no matter whether this record really exists or not. This logic is ok for common upsert operation, but as you can see, it's not suitable in case of merge into.
@danny0405 @yihua @nsivabalan @jonvex @codope @vinothchandar how do you think, what is the best way to fix it?
Please, give me some advice, what is the proper place in code to change this logic: indexes, sprksqlwriter, MergeInto command, somewhere else?
Can you summarize the root cause of the error and give a high level example why it is incorrect, still confused about the context you gave here.
From documentation:
The MERGE INTO statement is similar to the UPDATE statement, but it allows you to specify different actions for matched and unmatched records.
Action matched: should be performed on records that already exist in both target table and source table.
Action not matched: should be performed on records that does not exist in target table.
If target table is COW - it works exactly like written above.
But for target MOR, in fact, it doesn't really check the existence of records in target table while deciding which action to execute: match or not match.
This decision making is based on the presence of currentLocation field which is set by HoodieIndex.tagLocation() procedure.
For example, if we use Bucket Simple index with bucket number = 1 and run merge into command, the tagLocation produces the same currentLocation for all source records (no matter exist that records in target table or not), so only matched action will be executed for all source records. We'll have incorrect results because non-existent records should be produced by not matched action.
But for target MOR, in fact, it doesn't really check the existence of records in target table while deciding which action to execute: match or not match.
I guess the MERGE INTO can only work correctly for global index, such as BLOOM_FILTER or HBASE, BUCKET index is kind of local for per-partition and used mainly for streaming ingestion.
I guess the
MERGE INTOcan only work correctly for global index, such asBLOOM_FILTERorHBASE,BUCKETindex is kind of local for per-partition and used mainly for streaming ingestion.
But it's not working correctly even with GLOBAL_BLOOM. For COW it works well with any type of index, but for MOR it works correctly only with INMEMORY.
Could this behavior be called the known limitation of MERGE INTO statement? I think not..
Hmm, then we might need to debug in details what stage is missing for MOR, is it the missed location tag you mentioned above, does it bring in large cost if we supplement the missing logic?
CC @codope
Hmm, then we might need to debug in details what stage is missing for MOR, is it the missed location tag you mentioned above, does it bring in large cost if we supplement the missing logic?
it seems I've found the cause of this incorrect behavior.
HoodieAppendHandle:
ExpressionPayload:
And now i need your ideas how to fix this. @danny0405 @pengzhiwei2018
One way is we check the HoodieOperation of the record, but first of all, it needs to be set up correctly, and it looks like the issue is only related with DELETES, I guess maybe we just need to distinguish the deletes by HoodieRecord.isDelete.
One way is we check the
HoodieOperationof the record, but first of all, it needs to be set up correctly, and it looks like the issue is only related withDELETES, I guess maybe we just need to distinguish the deletes byHoodieRecord.isDelete.
@danny0405 May be i don't understand you right, but i disagree about "the issue is only related with DELETES". I think, the issue relates to the making decision which action to execute on each incoming record (match or not_match).
Let me clarify the case above:
- at the beginning, we have target table of type MOR with records: (3,130,'aa3'), (5,150,'aa5'), (6,160,'aa6')
- then we deleted record with id=3, so we have in target: (5,150,'aa5'), (6,160,'aa6')
- source table contains these records: (3,130,'aa3'), (5,150,'aa5'), (6,160,'aa6'), (7,170,'aa7')
- run merge into target from source: when matched then update with modification, when not matched then insert as is. So target table should become: (3,130,'aa3'), (5,151,'oo'), (6,161,'oo'), (7,170,'aa7') id=3 should be inserted as is (3,130,'aa3') (as it was previously deleted from target) id=7 should be inserted as is (7,170,'aa7') (as it is new to target) id=5 should be updated (150->151, aa5 -> oo) id=5 should be updated (160->161, aa6 -> oo)
It works like this with COW (any index) and MOR (INMEMORY index only). For BUCKET with bucket number=1: all records are updated (because currentLocation always tagged for all records). For other indexes: 3,5,6 updated, 7 inserted (previously deleted record 3 is taken as "matched").
Its because it doesn't check the existence of records in target table when deciding which action to execute (match or not_match), it just looks at currentLocation in HoodieRecord.
And as you can see, for MOR table and any type of index (except INMEMORY) presence of currentLocation in HoodieRecord - should not be the sign of record existence.
If we somehow set up the HoodieOperation of the record correctly (really check the existence of records), we slow down performance significantly (especially for BUCKET index). If not - we have MERGE INTO working with MOR tables incorrectly.
If we somehow set up the HoodieOperation of the record correctly (really check the existence of records), we slow down performance significantly (especially for BUCKET index). If not - we have MERGE INTO working with MOR tables incorrectly.
You are right, Hudi upsert/insert into table with primary keys already have the ability to update records, there is no need to use the MERGE INTO, MERGE INTO is mainly designed for batch ingestion instead of streaming, and for Spark BLOOM index, only update/deletes can be written into logs, so I guess we just need to fix it though(it it is not a delete, then it should be an update)
hi guys, any updates for this issue now?
hi guys, any updates for this issue now?
yes, we are working on it.
Hi, I took a quick look at this. This reminds me of the challenges with global index change partition: org.apache.hudi.index.HoodieIndexUtils#mergeForPartitionUpdatesIfNeeded. Maybe it is different, but taking a look at how we do that might give you some ideas
Hi, just wanted to update. We are working on the design for the fix. It is going to be a pretty major change to MIT so it will take some time.