watermill icon indicating copy to clipboard operation
watermill copied to clipboard

[watermill-kafka] Stress test fails in CI

Open roblaszczak opened this issue 5 years ago • 1 comments

Because of too big load, Watermill Kafka tests are failing in CI. Locally the problem also occurs, until I will not set ulimit.

They are probably multiple solutions:

  • find a way to set ulimit in CI (I tried to do it in docker-compose config, but didn't help)
  • find, if we can fix something in the code in order to limit the load
  • reduce the number of parallel tests (not preferred)

Example failed build: https://circleci.com/gh/ThreeDotsLabs/watermill-kafka/235?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link

roblaszczak avatar Jan 28 '20 19:01 roblaszczak

@roblaszczak @m110

I believe I have fixed the issue. I made some modifications and ran the stress tests locally, which passed. I would fork and submit a PR, but I have already forked for my own confluent-kafka-go implementation, so I will just post the code snippet here for you or someone else with permissions to the repo.

func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	logFields := h.messageLogFields.Copy().Add(watermill.LogFields{
		"kafka_partition":      claim.Partition(),
		"kafka_initial_offset": claim.InitialOffset(),
	})

	for kafkaMsg := range claim.Messages() {
		h.logger.Debug("Message claimed", logFields)
		if err := h.messageHandler.processMessage(h.ctx, kafkaMsg, sess, logFields); err != nil {
			return err
		}
		select {
		case <-h.closing:
			h.logger.Debug("Subscriber is closing, stopping consumerGroupHandler", logFields)
			return nil

		case <-h.ctx.Done():
			h.logger.Debug("Ctx was cancelled, stopping consumerGroupHandler", logFields)
			return nil
		default:
			continue
		}
	}

	return nil
}

Hope this helps!

codingDr avatar Nov 21 '22 23:11 codingDr