snowflake-kafka-connector
snowflake-kafka-connector copied to clipboard
Read Load History taking past 1hr ( from previous month ) and gets into a rebalancing loop
kafka connect: cp-kafka-connect-base:6.2.0 Libs: snowflake-kafka-connector-1.2.3.jar
Debugging current date: 2022-07-05 Log:
[SF_KAFKA_CONNECTOR] read load history between 2022-05-25T18:53:45Z and 2022-05-25T19:53:45Z. retrieved 0 records. (com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceV1)
[2022-07-05 05:38:07,996] INFO [Consumer clientId=connector-consumer-load-0, groupId=connect-load] Member connector-consumer-load-0-15ee0dfd-ea2d-4cd8-be53-36d0e3ecbd87 sending LeaveGroup request to coordinator <hostname>:9092 (id: 2147482644 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 79 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 98 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:put 100 records (com.snowflake.kafka.connector.SnowflakeSinkTask)
[SF_KAFKA_CONNECTOR] read load history between 2022-05-25T18:53:45Z and 2022-05-25T19:53:45Z. retrieved 0 records. (com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceV1)
This in-turn triggers the rebalancing and this keeps continuing. Note: I tried reducing the max.poll.records from 500 to 100, the issue persists.
I couldn't understand the looping here "" read load history between 2022-05-25T18:53:45Z and 2022-05-25T19:53:45Z. retrieved 0 records "" code ref : https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeIngestionServiceV1.java#L221 Isn't this supposed to take the last 1 hr from current time ? Note: this is happening only with one consumer / partition of the topic which has got a huge lag.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
events-sync 0 6408022 25524319 19116297 connector-consumer-events-sync-0-b9142c5f-3bb7-47b1-bd44-a169a7984952 /xx.xx.xx.101 connector-consumer-events-sync-0
events-sync 1 25521059 25521202 143 connector-consumer-events-sync-1-107f2aa8-969c-4d7e-87f8-fdb2be2480b3 /xx.xx.xx.102 connector-consumer-events-sync-1
Its only happening int partition - 0 which has a huge lag.
Below is the setting used:
CONNECT_AUTO_OFFSET_RESET: "latest"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_CONFIG_STORAGE_TOPIC: "snowflakesync-config"
CONNECT_CONSUMER_MAX_POLL_RECORDS: "100"
CONNECT_CONSUMER_SESSION_TIMEOUT_MS: "30000"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_LOGGERS: "org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR"
CONNECT_MAX_POLL_INTERVAL_MS: "800000"
CONNECT_MAX_POLL_RECORDS: "100"
CONNECT_OFFSET_FLUSH_INTERVAL_MS: "10000"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_TOPIC: "snowflakesync-offset"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_REST_PORT: "8083"
CONNECT_SESSION_TIMEOUT_MS: "30000"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_TOPIC: "prod-snowflakesync-status-stop"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
Connect config:
connector.class: "com.snowflake.kafka.connector.SnowflakeSinkConnector"
tasks.max: "2"
topics: "test-topic"
snowflake.topic2table.map: "test-topic:table1"
buffer.count.records: "500000"
buffer.flush.time: "240"
buffer.size.bytes: "100000000"
snowflake.url.name: "<url>"
snowflake.warehouse.name: "name"
snowflake.user.name: "username"
snowflake.private.key: "key"
snowflake.private.key.passphrase: "pass"
snowflake.database.name: "db-name"
snowflake.schema.name: "schema-name"
key.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
value.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
Kindly suggest, if something is mistakenly configured in my end.
@sfc-gh-japatel @sfc-gh-zli @sfc-gh-anwang - Any help here would be much appreciated.
Thanks !
Hi @nkkrishnakfk
- can you upgrade your connector to something more recent? 1.6.6 or 1.7.2
- buffer.flush.time: "240" is 240 seconds, can you reduce it to something lower.. default is 120 seconds.. The lag is growing because of lack of flush to internal stage. Or you can just reduce your buffer.count.records: "500000" to 10000 to frequently flush. (I dont know your throughput so just giving suggestions)
- read load history between 2022-05-25T18:53:45Z and 2022-05-25T19:53:45Z. retrieved 0 records "" a. This is fine and if it doesnt find records between now and last one hour of history, we will put it into table stage as error handling.
Closing after 6 months of no response, please let us know if you need further help