Sporadic events causing lags?
I have a snowflake connector (using strimzi) defined like below, it consumes from a high traffic volume topic, and I have set a filter (from confluent), that will make the connector consume may be handful of events per day, then may be no events for many days, then there will be one, so events are really sporadic. We are noticing this is causing the lag to grow a lot. Trying to understand whether the issue is from the connector side, the filter or configuration or kafka broker.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
labels:
strimzi.io/cluster: connect-dev-ext
name: my-snowflake-sink-connector
spec:
autoRestart:
enabled: true
class: com.snowflake.kafka.connector.SnowflakeSinkConnector
config:
buffer.flush.time: 60
errors.log.enable: true
errors.log.include.messages: true
errors.tolerance: all
key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
key.ignore: true
schema.ignore: true
snowflake.database.name: MY_DB
snowflake.private.key: '${secrets:strimzi-kafka/snowflake-secrets:privateKey}'
snowflake.private.key.passphrase: '${secrets:strimzi-kafka/snowflake-secrets:privateKeyPassphrase}'
snowflake.schema.name: my_schema
snowflake.topic2table.map: 'my_kafka_topic:my_snowflake_table'
snowflake.url.name: '${secrets:strimzi-kafka/snowflake-secrets:accountUrl}'
snowflake.user.name: '${secrets:strimzi-kafka/snowflake-secrets:userName}'
topics: my_kafka_topic
transforms: snowflakeFilter
transforms.snowflakeFilter.filter.condition: '$[?("myValue" in @.my_key)]' //my_key is an array of strings
transforms.snowflakeFilter.filter.type: include
transforms.snowflakeFilter.missing.or.null.behavior: exclude
transforms.snowflakeFilter.type: io.confluent.connect.transforms.Filter$Value
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
tasksMax: 1
We have another snowflake connector without any filter, but on a different topic, that works as expected.
If you can attach logs or exceptions, that would be helpful
Hi @sfc-gh-japatel ! Thanks for responding, attaching the logs here from the point it receives the first event. Here are the steps as happened.
- To the above configuration, I just added
consumer.override.auto.offset.reset: latest. - So this particular topic get tens of gigs of messages. In the beginning the snowflake connector does not process anything (because of the filter).
- Then I push more events with the matching filter. I see the connector processing those events. And at some point, I believe the connector commits the offset.
- But after that the connector fall silent. There will be thousands of non-matching events and it appears the connector doesn't process anything (expected) but also does not commit any offsets.
- Then I push more matching events, still it doesn't process and lag continues to grow.
- It only works after I delete and recreate the consumer.
- I see some logs starting from
2024-03-08 15:25:23,269saying something about connector being stopped and removed. It happens only with this connector which has the filter, others are running fine. - I also see the log
consumer pro-actively leaving the group
Closing this. Got around this issue after replacing "kafka connector with filter" by "kafka streams filter + kafka connector without filter"