snowflake-kafka-connector icon indicating copy to clipboard operation
snowflake-kafka-connector copied to clipboard

Sporadic events causing lags?

Open SubhraB opened this issue 1 year ago • 2 comments

I have a snowflake connector (using strimzi) defined like below, it consumes from a high traffic volume topic, and I have set a filter (from confluent), that will make the connector consume may be handful of events per day, then may be no events for many days, then there will be one, so events are really sporadic. We are noticing this is causing the lag to grow a lot. Trying to understand whether the issue is from the connector side, the filter or configuration or kafka broker.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: connect-dev-ext
  name: my-snowflake-sink-connector
spec:
  autoRestart:
    enabled: true
  class: com.snowflake.kafka.connector.SnowflakeSinkConnector
  config:
    buffer.flush.time: 60
    errors.log.enable: true
    errors.log.include.messages: true
    errors.tolerance: all
    key.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: false
    key.ignore: true
    schema.ignore: true
    snowflake.database.name: MY_DB
    snowflake.private.key: '${secrets:strimzi-kafka/snowflake-secrets:privateKey}'
    snowflake.private.key.passphrase: '${secrets:strimzi-kafka/snowflake-secrets:privateKeyPassphrase}'
    snowflake.schema.name: my_schema
    snowflake.topic2table.map: 'my_kafka_topic:my_snowflake_table'
    snowflake.url.name: '${secrets:strimzi-kafka/snowflake-secrets:accountUrl}'
    snowflake.user.name: '${secrets:strimzi-kafka/snowflake-secrets:userName}'
    topics: my_kafka_topic
    transforms: snowflakeFilter
    transforms.snowflakeFilter.filter.condition: '$[?("myValue" in @.my_key)]' //my_key is an array of strings
    transforms.snowflakeFilter.filter.type: include
    transforms.snowflakeFilter.missing.or.null.behavior: exclude
    transforms.snowflakeFilter.type: io.confluent.connect.transforms.Filter$Value
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
  tasksMax: 1

We have another snowflake connector without any filter, but on a different topic, that works as expected.

SubhraB avatar Mar 08 '24 00:03 SubhraB

If you can attach logs or exceptions, that would be helpful

sfc-gh-japatel avatar Mar 08 '24 02:03 sfc-gh-japatel

snowflake.log

Hi @sfc-gh-japatel ! Thanks for responding, attaching the logs here from the point it receives the first event. Here are the steps as happened.

  1. To the above configuration, I just added consumer.override.auto.offset.reset: latest.
  2. So this particular topic get tens of gigs of messages. In the beginning the snowflake connector does not process anything (because of the filter).
  3. Then I push more events with the matching filter. I see the connector processing those events. And at some point, I believe the connector commits the offset.
  4. But after that the connector fall silent. There will be thousands of non-matching events and it appears the connector doesn't process anything (expected) but also does not commit any offsets.
  5. Then I push more matching events, still it doesn't process and lag continues to grow.
  6. It only works after I delete and recreate the consumer.
  7. I see some logs starting from 2024-03-08 15:25:23,269 saying something about connector being stopped and removed. It happens only with this connector which has the filter, others are running fine.
  8. I also see the log consumer pro-actively leaving the group

SubhraB avatar Mar 08 '24 15:03 SubhraB

Closing this. Got around this issue after replacing "kafka connector with filter" by "kafka streams filter + kafka connector without filter"

SubhraB avatar Apr 09 '24 19:04 SubhraB