seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [connector-kafka] kafka source commit offset fail and can not consume data

Open Xuzhengz opened this issue 1 year ago • 0 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

kafka source commit offset fail and can not consume data

SeaTunnel Version

2.3.8-release

SeaTunnel Config

{
  "env": {
    "parallelism": 1,
    "job.mode": "STREAMING",
    "checkpoint.interval": "5000",
    "job.retry.times": 1
  },
  "source": [
    {
      "plugin_name": "Kafka",
      "parallelism": 1,
      "bootstrap.servers": "172.16.8.200:19092,172.16.8.201:19092,172.16.8.202:19092",
      "topic": "ocean",
      "schema": {
        "fields": {
          "name": "string",
          "id": "int"
        }
      },
      "start_mode": "latest",
      "format_error_handle_way": "skip",
      "format": "json",
      "result_table_name": "jsonPath"
    }
  ],
  "transform": [
    {
      "plugin_name": "JsonPath",
      "columns": [
        {
          "src_field": "name",
          "path": "$.name1",
          "dest_field": "name1"
        },
        {
          "src_field": "name",
          "path": "$.name2",
          "dest_field": "name2"
        },
        {
          "src_field": "name",
          "path": "$.name3",
          "dest_field": "name3",
          "dest_type": "double"
        },
        {
          "src_field": "name",
          "path": "$.name4",
          "dest_field": "name4",
          "dest_type": "boolean"
        },
        {
          "src_field": "name",
          "path": "$.name5.name55",
          "dest_field": "name55"
        }
      ],
      "result_table_name": "console",
      "source_table_name": [
        "jsonPath"
      ]
    }
  ],
  "sink": [
    {
      "source_table_name": "console",
      "plugin_name": "Console"
    }
  ]
}

Running Command

bin/seatunnel.sh -c demo.json

Error Exception

[898453163625938945] 2024-10-15 14:12:33,886 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 5
[898453163625938945] 2024-10-15 14:12:33,897 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(5/1@898453163625938945) notify finished!
[898453163625938945] 2024-10-15 14:12:33,897 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, job id: 898453163625938945, pipeline id: 1, checkpoint id:5
[898453163625938945] 2024-10-15 14:12:38,898 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 6
[898453163625938945] 2024-10-15 14:12:38,942 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(6/1@898453163625938945) notify finished!
[898453163625938945] 2024-10-15 14:12:38,942 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, job id: 898453163625938945, pipeline id: 1, checkpoint id:6
[] 2024-10-15 14:12:38,951 INFO  org.apache.kafka.clients.NetworkClient - [Consumer clientId=seatunnel-consumer-0, groupId=SeaTunnel-Consumer-Group] Disconnecting from node 2147483644 due to socket connection setup timeout. The timeout value is 10756 ms.
[] 2024-10-15 14:12:38,956 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=seatunnel-consumer-0, groupId=SeaTunnel-Consumer-Group] Group coordinator 172.16.8.202:19092 (id: 2147483644 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
[898453163625938945] 2024-10-15 14:12:43,908 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 7
[898453163625938945] 2024-10-15 14:12:43,918 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(7/1@898453163625938945) notify finished!
[898453163625938945] 2024-10-15 14:12:43,918 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, job id: 898453163625938945, pipeline id: 1, checkpoint id:7

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.8

Screenshots

image image

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

Xuzhengz avatar Oct 15 '24 06:10 Xuzhengz