starrocks icon indicating copy to clipboard operation
starrocks copied to clipboard

[BUG] Found duplicate data when using the iceberge

Open mlbzssk opened this issue 1 year ago • 6 comments

Hi~ I am a green hand with starrocks. I found a quesition, I use v2 table with iceberg,and use the spark to upsert. The primary key is id and ts. CREATE TABLE upsert_demo( idbigint,addrstring,ts string) PARTITIONED BY (ts) TBLPROPERTIES ( 'current-schema'='{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"id","required":false,"type":"long"},{"id":2,"name":"addr","required":false,"type":"string"},{"id":3,"name":"ts","required":false,"type":"string"}]}', 'current-snapshot-id'='5208014445283775116', 'current-snapshot-summary'='{"added-data-files":"1","deleted-data-files":"2","removed-equality-delete-files":"1","removed-delete-files":"1","added-records":"9","deleted-records":"15","added-files-size":"1049719","removed-files-size":"3148815","removed-equality-deletes":"3","changed-partition-count":"1","total-records":"9","total-files-size":"2101152","total-data-files":"1","total-delete-files":"2","total-position-deletes":"3","total-equality-deletes":"9"}', 'current-snapshot-timestamp-ms'='1713261667810', 'default-partition-spec'='{"spec-id":0,"fields":[{"name":"ts","transform":"identity","source-id":3,"field-id":1000}]}', 'dlc.ao.data.govern.sorted.keys'='id', 'smart-optimizer.inherit'='default', 'snapshot-count'='3', 'table_type'='ICEBERG', 'uuid'='b4be3ec8-de16-4107-a11c-a8e69d1feec3', 'write.distribution-mode'='hash', 'write.merge.mode'='merge-on-read', 'write.metadata.delete-after-commit.enabled'='true', 'write.metadata.metrics.default'='full', 'write.metadata.previous-versions-max'='100', 'write.parquet.bloom-filter-enabled.column.id'='true', 'write.update.mode'='merge-on-read', 'write.upsert.enabled'='true', 'format-version'='2')

I got 9 lines data when I use the spark to query. image

But! I got 18 lines data when I use starsocks. image

Can anyone help me see where the problem lies?

mlbzssk avatar Apr 25 '24 09:04 mlbzssk

same here https://github.com/StarRocks/starrocks/pull/41267#issuecomment-2093788651

gaydba avatar May 03 '24 21:05 gaydba

hi, what's your starrocks version?

stephen-shelby avatar May 05 '24 04:05 stephen-shelby

same here #41267 (comment)

same here #41267 (comment)

Thanks! I will go study

mlbzssk avatar May 06 '24 03:05 mlbzssk

hi, what's your starrocks version?

The starrocks version is 3.1.10

mlbzssk avatar May 06 '24 03:05 mlbzssk

@mlbzssk hi, how do you upsert your table. could you share the reproduce process? I can't reproduce the issue using 3.1.10

stephen-shelby avatar May 08 '24 10:05 stephen-shelby

@mlbzssk hi, how do you upsert your table. could you share the reproduce process? I can't reproduce the issue using 3.1.10 @gaydba @stephen-shelby I also use the 3.1.10。 As i can see, this problem similar to https://github.com/apache/iceberg/issues/6153 And the version which I use have the patch https://github.com/StarRocks/starrocks/pull/41267#issuecomment-2093788651. the reproduce procedure is like: 1、create a mysql table,insert some data create table mysql_table(id int primary key ,addr varchar(50),ts varchar(50) ); insert into mysql_table values(1,"aaa","2024-04-30"); insert into mysql_table values(2,"bb","2024-04-30"); insert into mysql_table values(3,"ccc","2024-04-30"); 2、use spark-sql create a table with not primary key CREATE TABLE spark_table ( id int, addr string, ts string) USING iceberg PARTITIONED BY (ts) TBLPROPERTIES('format-version'='2', 'write.upsert.enabled'='true'); 3、use flink sql create 2 tables CREATE TABLE default_catalog.default_database.flink_table_to_mysql ( id int, addr string, ts string, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '', 'username' = '', 'password' = '', 'database-name' = 'iceberg_query_data', 'table-name' = 'mysql_table' );

CREATE TABLE flink_table_to_spark ( id INT COMMENT 'unique id', addr STRING NOT NULL, ts string, PRIMARY KEY(id,ts) NOT ENFORCED ) PARTITIONED BY (ts) WITH ( 'connector'='iceberg', 'catalog-name'='hive_prod', 'catalog-database'='default', 'catalog-table'='spark_table', 'uri'='thrift://', 'warehouse'='hdfs://' ); 4、let the data flow from default_catalog.default_database.flink_table_to_mysql to flink_table_to_spark INSERT INTO flink_table_to_spark /*+ OPTIONS('upsert-enabled'='true') */ SELECT * from default_catalog.default_database.flink_table_to_mysql; 5、update one row data in mysql update mysql_table set addr='hahahaha' where id = '1'; 6、query the data with starrocks and spark,you will find the result of the starrocks have 2 rows,but the spark only have 1 row. CREATE EXTERNAL CATALOG hive_catalog_iceberg PROPERTIES ( "type" = "iceberg", "iceberg.catalog.type" = "hive", "hive.metastore.uris" = "thrift://" );

mlbzssk avatar May 09 '24 11:05 mlbzssk

resolved

stephen-shelby avatar May 14 '24 15:05 stephen-shelby