seatunnel
seatunnel copied to clipboard
[fix] [connector-tdengine] fix sql exception and concurrentmodifyexception when connect to taos and read data
Purpose of this pull request
close https://github.com/apache/seatunnel/issues/6032 close https://github.com/apache/seatunnel/issues/5998
Does this PR introduce any user-facing change?
no
How was this patch tested?
test with seatunnel-engine-example, belowing is application log:
2023-12-26 16:12:21,967 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - checkpoint is disabled, because in batch mode and 'checkpoint.interval' of env is missing.
2023-12-26 16:12:22,001 INFO org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask - received enough reader, starting enumerator...
2023-12-26 16:12:22,011 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplitEnumerator - Assign pendingSplits to readers [0]
2023-12-26 16:12:22,012 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplitEnumerator - Assign splits [org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit@6ace399c, org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit@432a36dc, org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit@7669b2b7, org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit@7599e205] to reader 0
2023-12-26 16:12:22,020 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplitEnumerator - No more splits to assign. Sending NoMoreSplitsEvent to reader [0].
2023-12-26 16:12:22,021 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - no more split accepted!
2023-12-26 16:12:22,061 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - polling new split from queue!
2023-12-26 16:12:22,062 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - starting run new split d_1, query sql: select `ts`,`s_1`,`s_2`,`city_code` from test.d_1!
2023-12-26 16:12:22,169 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - polling new split from queue!
2023-12-26 16:12:22,169 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - starting run new split d_2, query sql: select `ts`,`s_1`,`s_2`,`city_code` from test.d_2!
2023-12-26 16:12:22,171 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_1, 2023-12-26T13:43:54.609, 1.0, 2.0, HF
2023-12-26 16:12:22,171 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_1, 2023-12-26T13:44:05.303, 1.1, 2.0, HF
2023-12-26 16:12:22,171 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_1, 2023-12-26T13:44:09.812, 1.2, 2.1, HF
2023-12-26 16:12:22,181 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_2, 2023-12-26T13:44:24.184, 1.1, 2.2, HF
2023-12-26 16:12:22,181 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_2, 2023-12-26T13:44:27.687, 1.2, 2.2, HF
2023-12-26 16:12:22,181 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_2, 2023-12-26T13:44:32.231, 1.3, 2.3, HF
2023-12-26 16:12:22,184 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - polling new split from queue!
2023-12-26 16:12:22,184 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - starting run new split d_4, query sql: select `ts`,`s_1`,`s_2`,`city_code` from test.d_4!
2023-12-26 16:12:22,193 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_4, 2023-12-26T13:45:21.366, 1.1, 2.1, HF
2023-12-26 16:12:22,193 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_4, 2023-12-26T13:45:24.381, 1.2, 2.1, HF
2023-12-26 16:12:22,193 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_4, 2023-12-26T13:45:26.941, 1.3, 2.1, HF
2023-12-26 16:12:22,193 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_4, 2023-12-26T13:45:30.452, 1.4, 2.2, HF
2023-12-26 16:12:22,197 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - polling new split from queue!
2023-12-26 16:12:22,197 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - starting run new split d_3, query sql: select `ts`,`s_1`,`s_2`,`city_code` from test.d_3!
2023-12-26 16:12:22,222 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_3, 2023-12-26T13:44:43.803, 1.0, 2.0, HF
2023-12-26 16:12:22,223 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_3, 2023-12-26T13:44:49.397, 1.1, 2.0, HF
2023-12-26 16:12:22,223 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_3, 2023-12-26T13:44:52.932, 1.2, 2.1, HF
2023-12-26 16:12:22,237 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - polling new split from queue!
2023-12-26 16:12:22,237 INFO org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - Closed the bounded jdbc source
2023-12-26 16:12:22,361 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 9223372036854775807
2023-12-26 16:12:23,021 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(9223372036854775807/1@791940519440678913) notify finished!
2023-12-26 16:12:23,021 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, job id: 791940519440678913, pipeline id: 1, checkpoint id:9223372036854775807
2023-12-26 16:12:23,043 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start clean pending checkpoint cause CheckpointCoordinator completed.
2023-12-26 16:12:23,054 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - Turn checkpoint_state_791940519440678913_1 state from null to FINISHED
2023-12-26 16:12:23,126 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-670882] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}
2023-12-26 16:12:23,149 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-670882] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1}
2023-12-26 16:12:23,150 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-670882] [5.1] Task TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1} complete with state FINISHED
2023-12-26 16:12:23,150 INFO org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-670882] [5.1] Received task end from execution TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1}, state FINISHED
2023-12-26 16:12:23,155 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1}] turned from state RUNNING to FINISHED.
2023-12-26 16:12:23,156 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1}] state process is stopped
2023-12-26 16:12:23,157 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1}] future complete with state FINISHED
2023-12-26 16:12:23,157 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-670882] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}
2023-12-26 16:12:23,158 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-670882] [5.1] Task TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000} complete with state FINISHED
2023-12-26 16:12:23,158 INFO org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-670882] [5.1] Received task end from execution TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}, state FINISHED
2023-12-26 16:12:23,162 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}] turned from state RUNNING to FINISHED.
2023-12-26 16:12:23,163 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}] state process is stopped
2023-12-26 16:12:23,163 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}] future complete with state FINISHED
2023-12-26 16:12:23,163 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)] will end with state FINISHED
2023-12-26 16:12:23,163 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)] turned from state RUNNING to FINISHED.
2023-12-26 16:12:23,207 INFO org.apache.seatunnel.engine.server.master.JobMaster - release the pipeline Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)] resource
2023-12-26 16:12:23,215 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 791940519440678913, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=791940519440678913, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='19c5a1c0-2b4e-4dea-968a-cc15ff536616'}
2023-12-26 16:12:23,215 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 791940519440678913, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=791940519440678913, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='19c5a1c0-2b4e-4dea-968a-cc15ff536616'}
2023-12-26 16:12:23,215 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)] state process is stop
2023-12-26 16:12:23,216 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)] future complete with state FINISHED
2023-12-26 16:12:23,216 DEBUG org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Try to update the Job fake_to_console.conf (791940519440678913) state from RUNNING to FINISHED
2023-12-26 16:12:23,216 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job fake_to_console.conf (791940519440678913) turned from state RUNNING to FINISHED.
2023-12-26 16:12:23,217 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job fake_to_console.conf (791940519440678913) state process is stop
2023-12-26 16:12:23,251 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (791940519440678913) end with state FINISHED
2023-12-26 16:12:23,401 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand -
***********************************************
Job Statistic Information
***********************************************
Start Time : 2023-12-26 16:08:53
End Time : 2023-12-26 16:12:23
Total Time(s) : 209
Total Read Count : 13
Total Write Count : 13
Total Failed Count : 0
***********************************************
Check list
- [ ] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
- [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
- [ ] If you are contributing the connector code, please check that the following files are updated:
- Update change log that in connector document. For more details you can refer to connector-v2
- Update plugin-mapping.properties and add new connector information in it
- Update the pom file of seatunnel-dist
- [ ] Update the
release-note
.
e2e test failed
please try merge from dev.
@Hisoka-X checked out dev and still got same error
Are you try rebuild source code? Because before run e2e, seatunnel need build to refresh jar.
@Hisoka-X I rebuild project and still got same error
command: mvn package -pl seatunnel-dist -am -Dmaven.test.skip=true
Your commit list needs to be cleaned up
@hailin0 PTAL
Can you provide some test cases?
Can you provide some test cases?
@liugddx done
Please fix ci.
How did you fix this?
No suitable driver found for jdbc:TAOS-RS://localhost:6041/test?user=root&password=taosdata
already fixed in previous commit by others, see line: 134-135 // check td driver whether exist and if not, try to register checkDriverExist(jdbcUrl);