watermill
watermill copied to clipboard
[watermill-kafka] Stress test fails in CI
Because of too big load, Watermill Kafka tests are failing in CI. Locally the problem also occurs, until I will not set ulimit.
They are probably multiple solutions:
- find a way to set
ulimitin CI (I tried to do it in docker-compose config, but didn't help) - find, if we can fix something in the code in order to limit the load
- reduce the number of parallel tests (not preferred)
Example failed build: https://circleci.com/gh/ThreeDotsLabs/watermill-kafka/235?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link
@roblaszczak @m110
I believe I have fixed the issue. I made some modifications and ran the stress tests locally, which passed. I would fork and submit a PR, but I have already forked for my own confluent-kafka-go implementation, so I will just post the code snippet here for you or someone else with permissions to the repo.
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
logFields := h.messageLogFields.Copy().Add(watermill.LogFields{
"kafka_partition": claim.Partition(),
"kafka_initial_offset": claim.InitialOffset(),
})
for kafkaMsg := range claim.Messages() {
h.logger.Debug("Message claimed", logFields)
if err := h.messageHandler.processMessage(h.ctx, kafkaMsg, sess, logFields); err != nil {
return err
}
select {
case <-h.closing:
h.logger.Debug("Subscriber is closing, stopping consumerGroupHandler", logFields)
return nil
case <-h.ctx.Done():
h.logger.Debug("Ctx was cancelled, stopping consumerGroupHandler", logFields)
return nil
default:
continue
}
}
return nil
}
Hope this helps!