seatunnel
seatunnel copied to clipboard
[Bug] [connector-kafka] kafka source commit offset fail and can not consume data
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
kafka source commit offset fail and can not consume data
SeaTunnel Version
2.3.8-release
SeaTunnel Config
{
"env": {
"parallelism": 1,
"job.mode": "STREAMING",
"checkpoint.interval": "5000",
"job.retry.times": 1
},
"source": [
{
"plugin_name": "Kafka",
"parallelism": 1,
"bootstrap.servers": "172.16.8.200:19092,172.16.8.201:19092,172.16.8.202:19092",
"topic": "ocean",
"schema": {
"fields": {
"name": "string",
"id": "int"
}
},
"start_mode": "latest",
"format_error_handle_way": "skip",
"format": "json",
"result_table_name": "jsonPath"
}
],
"transform": [
{
"plugin_name": "JsonPath",
"columns": [
{
"src_field": "name",
"path": "$.name1",
"dest_field": "name1"
},
{
"src_field": "name",
"path": "$.name2",
"dest_field": "name2"
},
{
"src_field": "name",
"path": "$.name3",
"dest_field": "name3",
"dest_type": "double"
},
{
"src_field": "name",
"path": "$.name4",
"dest_field": "name4",
"dest_type": "boolean"
},
{
"src_field": "name",
"path": "$.name5.name55",
"dest_field": "name55"
}
],
"result_table_name": "console",
"source_table_name": [
"jsonPath"
]
}
],
"sink": [
{
"source_table_name": "console",
"plugin_name": "Console"
}
]
}
Running Command
bin/seatunnel.sh -c demo.json
Error Exception
[898453163625938945] 2024-10-15 14:12:33,886 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 5
[898453163625938945] 2024-10-15 14:12:33,897 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(5/1@898453163625938945) notify finished!
[898453163625938945] 2024-10-15 14:12:33,897 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, job id: 898453163625938945, pipeline id: 1, checkpoint id:5
[898453163625938945] 2024-10-15 14:12:38,898 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 6
[898453163625938945] 2024-10-15 14:12:38,942 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(6/1@898453163625938945) notify finished!
[898453163625938945] 2024-10-15 14:12:38,942 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, job id: 898453163625938945, pipeline id: 1, checkpoint id:6
[] 2024-10-15 14:12:38,951 INFO org.apache.kafka.clients.NetworkClient - [Consumer clientId=seatunnel-consumer-0, groupId=SeaTunnel-Consumer-Group] Disconnecting from node 2147483644 due to socket connection setup timeout. The timeout value is 10756 ms.
[] 2024-10-15 14:12:38,956 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=seatunnel-consumer-0, groupId=SeaTunnel-Consumer-Group] Group coordinator 172.16.8.202:19092 (id: 2147483644 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
[898453163625938945] 2024-10-15 14:12:43,908 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 7
[898453163625938945] 2024-10-15 14:12:43,918 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(7/1@898453163625938945) notify finished!
[898453163625938945] 2024-10-15 14:12:43,918 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, job id: 898453163625938945, pipeline id: 1, checkpoint id:7
Zeta or Flink or Spark Version
No response
Java or Scala Version
1.8
Screenshots
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct