seatunnel
seatunnel copied to clipboard
[Bug] [JDBC] JDBC Source lost data, occasionally reoccur
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
Data migration is now performed through the REST API. It is basically executed all the time. The amount of data each time does not exceed 20,000. Data loss occurs every 4-5 hours and reappears stably.
SeaTunnel Version
2.3.5
SeaTunnel Config
There are more than 200 tables, but not all are displayed for convenience.
{
"env": {
"parallelism": "1",
"job.mode": "BATCH",
"checkpoint.interval": "10000",
"checkpoint.timeout": "60000",
"execution.checkpoint.interval": "10000",
"read_limit.rows_per_second": null,
"read_limit.bytes_per_second": null,
"job.retry.times": 3
},
"source": [{
"tableInfo": {
"columnName": "updated_time",
"dataType": 93,
"tableName": "work_order_detail",
"primaryKeys": [{
"primaryKey": "id",
"dataType": -5,
"autoColumn": true
}],
"tableEarliestTime": "1719193899073"
},
"password": "P@ssw0rd",
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"query": "select * from work_order_detail where updated_time >= %s and updated_time <= %s",
"result_table_name": "switch_work_order_detail",
"plugin_name": "Jdbc",
"user": "sa",
"url": "jdbc:sqlserver://118.118.1.127:1433;DatabaseName=hs_test;encrypt=false;sendStringParametersAsUnicode=false"
}],
"transform": [{
"tableInfo": {
"columnName": "updated_time",
"dataType": 93,
"tableName": "work_order_detail",
"primaryKeys": [{
"primaryKey": "id",
"dataType": -5,
"autoColumn": true
}],
"tableEarliestTime": "1719193899073"
},
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"query": "select * from switch_work_order_detail where updated_time >= %s and updated_time <= %s",
"source_table_name": "switch_work_order_detail",
"result_table_name": "work_order_detail",
"plugin_name": "Sql"
}],
"sink": [{
"password": "P@ssw0rd",
"database": "hs_target",
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"query": "SET IDENTITY_INSERT [work_order_detail] ON;MERGE INTO work_order_detail AS target USING (VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)) AS source ([id],[work_order_id],[plant_id],[description],[created_by],[created_time],[updated_by],[updated_time],[trx_id],[creation_pid],[update_pid],[version],[process_task_number],[task_status],[station_code],[tray_id],[serial_number],[laser_number],[out_okng],[is_last_operation],[parses_status],[rework_times],[sample],[next_rework_station]) ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[work_order_id] =source.[work_order_id],target.[plant_id] =source.[plant_id],target.[description] =source.[description],target.[created_by] =source.[created_by],target.[created_time] =source.[created_time],target.[updated_by] =source.[updated_by],target.[updated_time] =source.[updated_time],target.[trx_id] =source.[trx_id],target.[creation_pid] =source.[creation_pid],target.[update_pid] =source.[update_pid],target.[version] =source.[version],target.[process_task_number] =source.[process_task_number],target.[task_status] =source.[task_status],target.[station_code] =source.[station_code],target.[tray_id] =source.[tray_id],target.[serial_number] =source.[serial_number],target.[laser_number] =source.[laser_number],target.[out_okng] =source.[out_okng],target.[is_last_operation] =source.[is_last_operation],target.[parses_status] =source.[parses_status],target.[rework_times] =source.[rework_times],target.[sample] =source.[sample],target.[next_rework_station] =source.[next_rework_station] WHEN NOT MATCHED THEN INSERT ([id],[work_order_id],[plant_id],[description],[created_by],[created_time],[updated_by],[updated_time],[trx_id],[creation_pid],[update_pid],[version],[process_task_number],[task_status],[station_code],[tray_id],[serial_number],[laser_number],[out_okng],[is_last_operation],[parses_status],[rework_times],[sample],[next_rework_station]) VALUES (source.[id],source.[work_order_id],source.[plant_id],source.[description],source.[created_by],source.[created_time],source.[updated_by],source.[updated_time],source.[trx_id],source.[creation_pid],source.[update_pid],source.[version],source.[process_task_number],source.[task_status],source.[station_code],source.[tray_id],source.[serial_number],source.[laser_number],source.[out_okng],source.[is_last_operation],source.[parses_status],source.[rework_times],source.[sample],source.[next_rework_station]);SET IDENTITY_INSERT [work_order_detail] OFF;",
"source_table_name": "work_order_detail",
"plugin_name": "Jdbc",
"user": "sa",
"url": "jdbc:sqlserver://118.118.1.127:1433;DatabaseName=hs_target;encrypt=false;sendStringParametersAsUnicode=false"
}]
}
Running Command
java -Xms2G -Xmx4G -Dseatunnel.config=/usr/local/src/apache-seatunnel-2.3.5/config/seatunnel.yaml -Dhazelcast.config=/usr/local/src/apache-seatunnel-2.3.5/config/hazelcast.yaml -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector -Dlog4j2.configurationFile=/usr/local/src/apache-seatunnel-2.3.5/config/log4j2.properties -Dseatunnel.logs.path=/usr/local/src/apache-seatunnel-2.3.5/logs -Dseatunnel.logs.file_name=seatunnel-engine-server -Xms2g -Xmx2g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server -XX:MaxMetaspaceSize=2g -XX:+UseG1GC -cp /usr/local/src/apache-seatunnel-2.3.5/lib/*:/usr/local/src/apache-seatunnel-2.3.5/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer -d
Error Exception
No Exception
Zeta or Flink or Spark Version
zeta
Java or Scala Version
1.8
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct