snowflake-kafka-connector icon indicating copy to clipboard operation
snowflake-kafka-connector copied to clipboard

Read Load History taking past 1hr ( from previous month ) and gets into a rebalancing loop

Open nkkrishnakfk opened this issue 2 years ago • 1 comments

kafka connect: cp-kafka-connect-base:6.2.0 Libs: snowflake-kafka-connector-1.2.3.jar

Debugging current date: 2022-07-05 Log:

[SF_KAFKA_CONNECTOR] read load history between 2022-05-25T18:53:45Z and 2022-05-25T19:53:45Z. retrieved 0 records. (com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceV1)
[2022-07-05 05:38:07,996] INFO [Consumer clientId=connector-consumer-load-0, groupId=connect-load] Member connector-consumer-load-0-15ee0dfd-ea2d-4cd8-be53-36d0e3ecbd87 sending LeaveGroup request to coordinator <hostname>:9092 (id: 2147482644 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 79 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 98 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] read load history between 2022-05-25T18:53:45Z and 2022-05-25T19:53:45Z. retrieved 0 records. (com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceV1)

This in-turn triggers the rebalancing and this keeps continuing. Note: I tried reducing the max.poll.records from 500 to 100, the issue persists.

I couldn't understand the looping here "" read load history between 2022-05-25T18:53:45Z and 2022-05-25T19:53:45Z. retrieved 0 records "" code ref : https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeIngestionServiceV1.java#L221 Isn't this supposed to take the last 1 hr from current time ? Note: this is happening only with one consumer / partition of the topic which has got a huge lag.

TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                HOST            CLIENT-ID
events-sync            0                 6408022                25524319              19116297        connector-consumer-events-sync-0-b9142c5f-3bb7-47b1-bd44-a169a7984952 /xx.xx.xx.101   connector-consumer-events-sync-0
events-sync            1              25521059                 25521202                 143                 connector-consumer-events-sync-1-107f2aa8-969c-4d7e-87f8-fdb2be2480b3 /xx.xx.xx.102   connector-consumer-events-sync-1

Its only happening int partition - 0 which has a huge lag.

Below is the setting used:

  CONNECT_AUTO_OFFSET_RESET: "latest"
  CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_CONFIG_STORAGE_TOPIC: "snowflakesync-config"
  CONNECT_CONSUMER_MAX_POLL_RECORDS: "100"
  CONNECT_CONSUMER_SESSION_TIMEOUT_MS: "30000"
  CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_LOG4J_LOGGERS: "org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR"
  CONNECT_MAX_POLL_INTERVAL_MS: "800000"
  CONNECT_MAX_POLL_RECORDS: "100"
  CONNECT_OFFSET_FLUSH_INTERVAL_MS: "10000"
  CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_OFFSET_STORAGE_TOPIC: "snowflakesync-offset"
  CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
  CONNECT_REST_PORT: "8083"
  CONNECT_SESSION_TIMEOUT_MS: "30000"
  CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_STATUS_STORAGE_TOPIC: "prod-snowflakesync-status-stop"
  CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"


Connect config:

  connector.class: "com.snowflake.kafka.connector.SnowflakeSinkConnector"
  tasks.max: "2"
  topics: "test-topic"
  snowflake.topic2table.map: "test-topic:table1"
  buffer.count.records: "500000"
  buffer.flush.time: "240"
  buffer.size.bytes: "100000000"
  snowflake.url.name: "<url>"
  snowflake.warehouse.name: "name"
  snowflake.user.name: "username"
  snowflake.private.key: "key"
  snowflake.private.key.passphrase: "pass"
  snowflake.database.name: "db-name"
  snowflake.schema.name: "schema-name"
  key.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
  value.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"


Kindly suggest, if something is mistakenly configured in my end.

@sfc-gh-japatel @sfc-gh-zli @sfc-gh-anwang - Any help here would be much appreciated.

Thanks !

nkkrishnakfk avatar Jul 05 '22 05:07 nkkrishnakfk

Hi @nkkrishnakfk

  1. can you upgrade your connector to something more recent? 1.6.6 or 1.7.2
  2. buffer.flush.time: "240" is 240 seconds, can you reduce it to something lower.. default is 120 seconds.. The lag is growing because of lack of flush to internal stage. Or you can just reduce your buffer.count.records: "500000" to 10000 to frequently flush. (I dont know your throughput so just giving suggestions)
  3. read load history between 2022-05-25T18:53:45Z and 2022-05-25T19:53:45Z. retrieved 0 records "" a. This is fine and if it doesnt find records between now and last one hour of history, we will put it into table stage as error handling.

sfc-gh-japatel avatar Jul 28 '22 15:07 sfc-gh-japatel

Closing after 6 months of no response, please let us know if you need further help

sfc-gh-tzhang avatar Feb 24 '23 00:02 sfc-gh-tzhang