seatunnel
seatunnel copied to clipboard
[Bug] [Zeta] If the data exceeds 1 million, duplicate data will appear
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
when data more than 1million, the data seem appear duplicate , i get more than 2million data on starrocks, and seatunnel not stop , continue run , and duplicate data increase.
SeaTunnel Version
2.3.3 zeta
SeaTunnel Config
env {
execution.parallelism = 3
job.mode = BATCH
job.name = mkt
}
source {
Jdbc {
result_table_name = source
url = "jdbc:sqlserver://"
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
connection_check_timeout_sec = 100
user =
password = ""
query = """select id, keyword, city, realcity, ver, imei, channel, uid, os, lands,
server_time, m_usercode, m_username, m_roomid, m_channel, m_addtime, m_updatetime,
m_searchcode, m_type, dt
FROM YS_BI.marketing.marketing_share_room_record
"""
}
}
transform {
}
sink {
StarRocks {
source_table_name = [result]
nodeUrls = ["clickhouse01:8030", "clickhouse02:8030", "clickhouse03:8030"]
base-url = "jdbc:mysql:loadBalance://clickhouse01:9030,clickhouse02:9030,clickhouse03:9030"
username = root
password = ""
database = dws
table = mkt_record
save_mode_create_template = """
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
server_time DATETIME NOT NULL,
dt DATE NOT NULL,
${rowtype_fields}
) ENGINE = OLAP
DUPLICATE KEY(server_time)
PARTITION BY (dt)
DISTRIBUTED BY HASH(dt)
"""
}
}
Running Command
seatunnel.sh -c mkt.conf
Error Exception
duplicate data
Zeta or Flink or Spark Version
Zeta
Java or Scala Version
java 1.8
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Can you give us more details? Examples include the StarRocks version and Mysql & StarRocks DDl
Can you give us more details? Examples include the StarRocks version and Mysql & StarRocks DDl
starrocks 3.1
maybe this problem , now for me, i import by sql where dt between '' and ''
to limit once data lower than 1 million, and run many times.
CREATE TABLE `mkt_record` (
`server_time` datetime NOT NULL COMMENT "",
`dt` date NOT NULL COMMENT "",
`id` bigint(20) NULL COMMENT "",
`keyword` varchar(65533) NULL COMMENT "",
`city` varchar(65533) NULL COMMENT "",
`realcity` varchar(65533) NULL COMMENT "",
`ver` varchar(65533) NULL COMMENT "",
`imei` varchar(65533) NULL COMMENT "",
`channel` varchar(65533) NULL COMMENT "",
`uid` int(11) NULL COMMENT "",
`os` varchar(65533) NULL COMMENT "",
`lands` int(11) NULL COMMENT "",
`m_usercode` varchar(65533) NULL COMMENT "",
`m_username` varchar(65533) NULL COMMENT "",
`m_roomid` int(11) NULL COMMENT "",
`m_channel` varchar(65533) NULL COMMENT "",
`m_addtime` datetime NULL COMMENT "",
`m_updatetime` datetime NULL COMMENT "",
`m_searchcode` varchar(65533) NULL COMMENT "",
`m_type` int(11) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`server_time`)
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`dt`)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"partition_live_number" = "100",
"compression" = "LZ4"
);
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
I had encountered this problem in version 2.3.3 from dameng to GaussDB. The cluster have 3 node.Every source record has 4 duplicate in running.
when parallelism >1,you need set partition_column
,partition_upper_bound
and partition_lower_bound
. https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/Jdbc#partition_column-string
I also encounter this problems in Spark engine with seatunnel 2.3.3. My config is
{
"env" : {
"job.mode" : "BATCH",
"job.name" : "ods_wxzj_t_keywords_message",
"parallelism" : 1
},
"source" : [
{
"driver" : "com.mysql.cj.jdbc.Driver",
"url" : "jdbc:mysql://*:3306/weiya_chat?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&tinyInt1isBit=false&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=round",
"user" : "",
"password" : "",
"query" : "SELECT\nid\n,fromUser as fromuser\n,tolist\n,msgtype\n,content\n,keyword\n,status\n,fromUserName as fromusername\n,tolistName as tolistname\n,fromUserId as fromuserid\n,roomId as roomid\n,isRoomChat as isroomchat\n,departmentName as departmentname\n,departmentId as departmentid\n,type\n,msgTime as msgtime\n,createdAt as createdat\n,logId as logid\n,msgStatus as msgstatus\n,groupType as grouptype\n,containMobile as containmobile\n,containAddr as containaddr\nFROM\nweiya_chat.t_keywords_message",
"plugin_name" : "Jdbc",
"partition_column" : "id",
"partition_num" : 8
}
],
"sink" : [
{
"plugin_name" : "Hive",
"table_name" : "tmp.tmp_dcp_dw_source_ods_wxzj_t_keywords_message_wyqy_seatunnel_3000983",
"metastore_uri" : "thrift://*:9083"
}
]
}
When I set the parallelism to 1, the problem still exists...