[BUG] Found duplicate data when using the iceberge
Hi~
I am a green hand with starrocks.
I found a quesition, I use v2 table with iceberg,and use the spark to upsert. The primary key is id and ts.
CREATE TABLE upsert_demo( idbigint,addrstring,ts string) PARTITIONED BY (ts) TBLPROPERTIES ( 'current-schema'='{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"id","required":false,"type":"long"},{"id":2,"name":"addr","required":false,"type":"string"},{"id":3,"name":"ts","required":false,"type":"string"}]}', 'current-snapshot-id'='5208014445283775116', 'current-snapshot-summary'='{"added-data-files":"1","deleted-data-files":"2","removed-equality-delete-files":"1","removed-delete-files":"1","added-records":"9","deleted-records":"15","added-files-size":"1049719","removed-files-size":"3148815","removed-equality-deletes":"3","changed-partition-count":"1","total-records":"9","total-files-size":"2101152","total-data-files":"1","total-delete-files":"2","total-position-deletes":"3","total-equality-deletes":"9"}', 'current-snapshot-timestamp-ms'='1713261667810', 'default-partition-spec'='{"spec-id":0,"fields":[{"name":"ts","transform":"identity","source-id":3,"field-id":1000}]}', 'dlc.ao.data.govern.sorted.keys'='id', 'smart-optimizer.inherit'='default', 'snapshot-count'='3', 'table_type'='ICEBERG', 'uuid'='b4be3ec8-de16-4107-a11c-a8e69d1feec3', 'write.distribution-mode'='hash', 'write.merge.mode'='merge-on-read', 'write.metadata.delete-after-commit.enabled'='true', 'write.metadata.metrics.default'='full', 'write.metadata.previous-versions-max'='100', 'write.parquet.bloom-filter-enabled.column.id'='true', 'write.update.mode'='merge-on-read', 'write.upsert.enabled'='true', 'format-version'='2')
I got 9 lines data when I use the spark to query.
But! I got 18 lines data when I use starsocks.
Can anyone help me see where the problem lies?
same here https://github.com/StarRocks/starrocks/pull/41267#issuecomment-2093788651
hi, what's your starrocks version?
hi, what's your starrocks version?
The starrocks version is 3.1.10
@mlbzssk hi, how do you upsert your table. could you share the reproduce process? I can't reproduce the issue using 3.1.10
@mlbzssk hi, how do you upsert your table. could you share the reproduce process? I can't reproduce the issue using 3.1.10 @gaydba @stephen-shelby I also use the 3.1.10。 As i can see, this problem similar to https://github.com/apache/iceberg/issues/6153 And the version which I use have the patch https://github.com/StarRocks/starrocks/pull/41267#issuecomment-2093788651. the reproduce procedure is like: 1、create a mysql table,insert some data create table mysql_table(id int primary key ,addr varchar(50),ts varchar(50) ); insert into mysql_table values(1,"aaa","2024-04-30"); insert into mysql_table values(2,"bb","2024-04-30"); insert into mysql_table values(3,"ccc","2024-04-30"); 2、use spark-sql create a table with not primary key CREATE TABLE
spark_table( id int, addr string, ts string) USING iceberg PARTITIONED BY (ts) TBLPROPERTIES('format-version'='2', 'write.upsert.enabled'='true'); 3、use flink sql create 2 tables CREATE TABLE default_catalog.default_database.flink_table_to_mysql ( id int, addr string, ts string, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '', 'username' = '', 'password' = '', 'database-name' = 'iceberg_query_data', 'table-name' = 'mysql_table' );
CREATE TABLE flink_table_to_spark (
id INT COMMENT 'unique id',
addr STRING NOT NULL,
ts string,
PRIMARY KEY(id,ts) NOT ENFORCED
) PARTITIONED BY (ts)
WITH (
'connector'='iceberg',
'catalog-name'='hive_prod',
'catalog-database'='default',
'catalog-table'='spark_table',
'uri'='thrift://',
'warehouse'='hdfs://'
);
4、let the data flow from default_catalog.default_database.flink_table_to_mysql to flink_table_to_spark
INSERT INTO
flink_table_to_spark /*+ OPTIONS('upsert-enabled'='true') */
SELECT * from default_catalog.default_database.flink_table_to_mysql;
5、update one row data in mysql
update mysql_table set addr='hahahaha' where id = '1';
6、query the data with starrocks and spark,you will find the result of the starrocks have 2 rows,but the spark only have 1 row.
CREATE EXTERNAL CATALOG hive_catalog_iceberg
PROPERTIES
(
"type" = "iceberg",
"iceberg.catalog.type" = "hive",
"hive.metastore.uris" = "thrift://"
);
resolved