seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [Zeta] If the data exceeds 1 million, duplicate data will appear

Open matianhe3 opened this issue 1 year ago • 7 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

when data more than 1million, the data seem appear duplicate , i get more than 2million data on starrocks, and seatunnel not stop , continue run , and duplicate data increase.

SeaTunnel Version

2.3.3 zeta

SeaTunnel Config

env {
  execution.parallelism = 3
  job.mode = BATCH
  job.name = mkt
}

source {
  Jdbc {
    result_table_name = source
    url = "jdbc:sqlserver://"
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    connection_check_timeout_sec = 100
    user = 
    password = ""
    query = """select id, keyword, city, realcity, ver, imei, channel, uid, os, lands,
    server_time, m_usercode, m_username, m_roomid, m_channel, m_addtime, m_updatetime,
    m_searchcode, m_type, dt
    FROM YS_BI.marketing.marketing_share_room_record
    """
  }
}

transform {
}

sink {
  StarRocks {
    source_table_name = [result]
    nodeUrls = ["clickhouse01:8030", "clickhouse02:8030", "clickhouse03:8030"]
    base-url = "jdbc:mysql:loadBalance://clickhouse01:9030,clickhouse02:9030,clickhouse03:9030"
    username = root
    password = ""
    database = dws
    table = mkt_record
    save_mode_create_template = """
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
  server_time DATETIME NOT NULL,
  dt DATE NOT NULL,
  ${rowtype_fields}
) ENGINE = OLAP
DUPLICATE KEY(server_time)
PARTITION BY (dt)
DISTRIBUTED BY HASH(dt)
"""
  }
}

Running Command

seatunnel.sh -c mkt.conf

Error Exception

duplicate data

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

java 1.8

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

matianhe3 avatar Oct 11 '23 03:10 matianhe3

Can you give us more details? Examples include the StarRocks version and Mysql & StarRocks DDl

zhilinli123 avatar Oct 11 '23 09:10 zhilinli123

Can you give us more details? Examples include the StarRocks version and Mysql & StarRocks DDl

starrocks 3.1

maybe this problem , now for me, i import by sql where dt between '' and '' to limit once data lower than 1 million, and run many times. image

CREATE TABLE `mkt_record` (
  `server_time` datetime NOT NULL COMMENT "",
  `dt` date NOT NULL COMMENT "",
  `id` bigint(20) NULL COMMENT "",
  `keyword` varchar(65533) NULL COMMENT "",
  `city` varchar(65533) NULL COMMENT "",
  `realcity` varchar(65533) NULL COMMENT "",
  `ver` varchar(65533) NULL COMMENT "",
  `imei` varchar(65533) NULL COMMENT "",
  `channel` varchar(65533) NULL COMMENT "",
  `uid` int(11) NULL COMMENT "",
  `os` varchar(65533) NULL COMMENT "",
  `lands` int(11) NULL COMMENT "",
  `m_usercode` varchar(65533) NULL COMMENT "",
  `m_username` varchar(65533) NULL COMMENT "",
  `m_roomid` int(11) NULL COMMENT "",
  `m_channel` varchar(65533) NULL COMMENT "",
  `m_addtime` datetime NULL COMMENT "",
  `m_updatetime` datetime NULL COMMENT "",
  `m_searchcode` varchar(65533) NULL COMMENT "",
  `m_type` int(11) NULL COMMENT ""
) ENGINE=OLAP 
DUPLICATE KEY(`server_time`)
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`dt`)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"partition_live_number" = "100",
"compression" = "LZ4"
);

matianhe3 avatar Oct 12 '23 07:10 matianhe3

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Nov 12 '23 00:11 github-actions[bot]

I had encountered this problem in version 2.3.3 from dameng to GaussDB. The cluster have 3 node.Every source record has 4 duplicate in running.

igaotang avatar Nov 15 '23 05:11 igaotang

when parallelism >1,you need set partition_column,partition_upper_bound and partition_lower_bound. https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/Jdbc#partition_column-string

liugddx avatar Dec 22 '23 05:12 liugddx

I also encounter this problems in Spark engine with seatunnel 2.3.3. My config is

{
    "env" : {
        "job.mode" : "BATCH",
        "job.name" : "ods_wxzj_t_keywords_message",
        "parallelism" : 1
    },
    "source" : [
        {
            "driver" : "com.mysql.cj.jdbc.Driver",
            "url" : "jdbc:mysql://*:3306/weiya_chat?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&tinyInt1isBit=false&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=round",
            "user" : "",
            "password" : "",
            "query" : "SELECT\nid\n,fromUser as fromuser\n,tolist\n,msgtype\n,content\n,keyword\n,status\n,fromUserName as fromusername\n,tolistName as tolistname\n,fromUserId as fromuserid\n,roomId as roomid\n,isRoomChat as isroomchat\n,departmentName as departmentname\n,departmentId as departmentid\n,type\n,msgTime as msgtime\n,createdAt as createdat\n,logId as logid\n,msgStatus as msgstatus\n,groupType as grouptype\n,containMobile as containmobile\n,containAddr as containaddr\nFROM\nweiya_chat.t_keywords_message",
            "plugin_name" : "Jdbc",
            "partition_column" : "id",
            "partition_num" : 8
        }
    ],
    "sink" : [
        {
            "plugin_name" : "Hive",
            "table_name" : "tmp.tmp_dcp_dw_source_ods_wxzj_t_keywords_message_wyqy_seatunnel_3000983",
            "metastore_uri" : "thrift://*:9083"
        }
    ]
}

SbloodyS avatar Apr 19 '24 06:04 SbloodyS

When I set the parallelism to 1, the problem still exists...

SbloodyS avatar Apr 19 '24 07:04 SbloodyS