seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [JDBC] JDBC Source lost data, occasionally reoccur

Open wangzhiwei61 opened this issue 7 months ago • 1 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

Data migration is now performed through the REST API. It is basically executed all the time. The amount of data each time does not exceed 20,000. Data loss occurs every 4-5 hours and reappears stably.

SeaTunnel Version

2.3.5

SeaTunnel Config

There are more than 200 tables, but not all are displayed for convenience.
{
	"env": {
		"parallelism": "1",
		"job.mode": "BATCH",
		"checkpoint.interval": "10000",
		"checkpoint.timeout": "60000",
		"execution.checkpoint.interval": "10000",
		"read_limit.rows_per_second": null,
		"read_limit.bytes_per_second": null,
		"job.retry.times": 3
	},
	"source": [{
		"tableInfo": {
			"columnName": "updated_time",
			"dataType": 93,
			"tableName": "work_order_detail",
			"primaryKeys": [{
				"primaryKey": "id",
				"dataType": -5,
				"autoColumn": true
			}],
			"tableEarliestTime": "1719193899073"
		},
		"password": "P@ssw0rd",
		"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
		"query": "select * from work_order_detail  where updated_time >= %s and  updated_time <= %s",
		"result_table_name": "switch_work_order_detail",
		"plugin_name": "Jdbc",
		"user": "sa",
		"url": "jdbc:sqlserver://118.118.1.127:1433;DatabaseName=hs_test;encrypt=false;sendStringParametersAsUnicode=false"
	}],
	"transform": [{
		"tableInfo": {
			"columnName": "updated_time",
			"dataType": 93,
			"tableName": "work_order_detail",
			"primaryKeys": [{
				"primaryKey": "id",
				"dataType": -5,
				"autoColumn": true
			}],
			"tableEarliestTime": "1719193899073"
		},
		"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
		"query": "select * from switch_work_order_detail  where updated_time >= %s and  updated_time <= %s",
		"source_table_name": "switch_work_order_detail",
		"result_table_name": "work_order_detail",
		"plugin_name": "Sql"
	}],
	"sink": [{
		"password": "P@ssw0rd",
		"database": "hs_target",
		"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
		"query": "SET IDENTITY_INSERT [work_order_detail] ON;MERGE INTO work_order_detail AS target USING (VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)) AS source ([id],[work_order_id],[plant_id],[description],[created_by],[created_time],[updated_by],[updated_time],[trx_id],[creation_pid],[update_pid],[version],[process_task_number],[task_status],[station_code],[tray_id],[serial_number],[laser_number],[out_okng],[is_last_operation],[parses_status],[rework_times],[sample],[next_rework_station]) ON (target.[id] = source.[id]) WHEN MATCHED THEN     UPDATE SET target.[work_order_id] =source.[work_order_id],target.[plant_id] =source.[plant_id],target.[description] =source.[description],target.[created_by] =source.[created_by],target.[created_time] =source.[created_time],target.[updated_by] =source.[updated_by],target.[updated_time] =source.[updated_time],target.[trx_id] =source.[trx_id],target.[creation_pid] =source.[creation_pid],target.[update_pid] =source.[update_pid],target.[version] =source.[version],target.[process_task_number] =source.[process_task_number],target.[task_status] =source.[task_status],target.[station_code] =source.[station_code],target.[tray_id] =source.[tray_id],target.[serial_number] =source.[serial_number],target.[laser_number] =source.[laser_number],target.[out_okng] =source.[out_okng],target.[is_last_operation] =source.[is_last_operation],target.[parses_status] =source.[parses_status],target.[rework_times] =source.[rework_times],target.[sample] =source.[sample],target.[next_rework_station] =source.[next_rework_station] WHEN NOT MATCHED THEN     INSERT ([id],[work_order_id],[plant_id],[description],[created_by],[created_time],[updated_by],[updated_time],[trx_id],[creation_pid],[update_pid],[version],[process_task_number],[task_status],[station_code],[tray_id],[serial_number],[laser_number],[out_okng],[is_last_operation],[parses_status],[rework_times],[sample],[next_rework_station]) VALUES (source.[id],source.[work_order_id],source.[plant_id],source.[description],source.[created_by],source.[created_time],source.[updated_by],source.[updated_time],source.[trx_id],source.[creation_pid],source.[update_pid],source.[version],source.[process_task_number],source.[task_status],source.[station_code],source.[tray_id],source.[serial_number],source.[laser_number],source.[out_okng],source.[is_last_operation],source.[parses_status],source.[rework_times],source.[sample],source.[next_rework_station]);SET IDENTITY_INSERT [work_order_detail] OFF;",
		"source_table_name": "work_order_detail",
		"plugin_name": "Jdbc",
		"user": "sa",
		"url": "jdbc:sqlserver://118.118.1.127:1433;DatabaseName=hs_target;encrypt=false;sendStringParametersAsUnicode=false"
	}]
}

Running Command

java -Xms2G -Xmx4G -Dseatunnel.config=/usr/local/src/apache-seatunnel-2.3.5/config/seatunnel.yaml -Dhazelcast.config=/usr/local/src/apache-seatunnel-2.3.5/config/hazelcast.yaml -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector -Dlog4j2.configurationFile=/usr/local/src/apache-seatunnel-2.3.5/config/log4j2.properties -Dseatunnel.logs.path=/usr/local/src/apache-seatunnel-2.3.5/logs -Dseatunnel.logs.file_name=seatunnel-engine-server -Xms2g -Xmx2g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server -XX:MaxMetaspaceSize=2g -XX:+UseG1GC -cp /usr/local/src/apache-seatunnel-2.3.5/lib/*:/usr/local/src/apache-seatunnel-2.3.5/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer -d

Error Exception

No Exception

Zeta or Flink or Spark Version

zeta

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

wangzhiwei61 avatar Jun 27 '24 01:06 wangzhiwei61