substrate icon indicating copy to clipboard operation
substrate copied to clipboard

Kafka consumption problem

Open teivah opened this issue 5 years ago • 3 comments

Hello (from a UW newcomer),

I do like substrate but there's something I really puzzled with.

Let's say I have the following use case (for integration testing or whatever):

  • I want to create a Kafka source with kafka.OffsetNewest
  • Once the consumer is ready to consume messages, I want to send a message and check whether it's been received by the consumer

Nothing really fancy isn't it? Yet, with the current implementation, I can't do it properly without having to use some random time.sleep.

The problem, in my humble opinion, is that we can't know when the Kafka client has been created (the call to cluster.NewConsumerFromClient). We should also bear in mind that this call might take some time in case for example of a rebalancing.

For me, ConsumeMessages(ctx context.Context, messages chan<- substrate.Message, acks <-chan substrate.Message) error should be designed this way:

  • The creation of the Kafka client is done synchronously in the current goroutine. If there is an error we return it.
  • If this part was ok, we create in another goroutine the logic consumes the Kafka messages and pass them to the client. Meanwhile, we return a nil error

As a user of substrate, I would then know whenever my consumer is ready. Then, up to me to do it in another goroutine if I want, synchronize it with a WaitGroup or whatever.

What do you guys think?

teivah avatar May 22 '19 12:05 teivah

I normally solve this problem by timing out on a select statement that watches the timer and the message consuming piece. If it takes longer than X it'll be considered a failed test, but also completes immediately when the message is consumed. Now there is always the possibility that X wasn't large enough, but such are the joys of integration testing.

thinktainer avatar May 22 '19 13:05 thinktainer

I started to reply, but I realised I don't fully understand the case.

Do you have example (annotated) code that shows the problem? Or a failing unit test to show the problem? @teivah

mjgarton avatar May 22 '19 13:05 mjgarton

@mjgarton Sure:

var input msg = []byte(strconv.Itoa(random.Intn(10000)))
ctxCancel, cancel := context.WithCancel(context.Background())
defer cancel()

source, err := broker.NewAsyncKafkaSource(brokers, topic, "cg", version, kafka.OffsetNewest)
if err != nil {
	assert.FailNow(t, err.Error())
}
sourceAck := make(chan substrate.Message, 1)
sourceMsg := make(chan substrate.Message, 1)
go func() {
	if err := source.ConsumeMessages(ctxCancel, sourceMsg, sourceAck); err != nil {
		assert.FailNow(t, err.Error())
	}
}()
time.Sleep(10000 * time.Millisecond)

sink, err := broker.NewAsyncKafkaSink(brokers, topic, version)
if err != nil {
	assert.FailNow(t, err.Error())
}
sinkAck := make(chan substrate.Message, 1)
sinkMsg := make(chan substrate.Message, 1)
sinkMsg <- input
go func() {
	if err := sink.PublishMessages(ctxCancel, sinkAck, sinkMsg); err != nil {
		assert.FailNow(t, err.Error())
	}
}()

ctxTimeout, _ := context.WithTimeout(context.Background(), 3000*time.Millisecond)
select {
case m := <-sourceMsg:
	assert.Equal(t, input.Data(), m.Data())
case <-ctxTimeout.Done():
	assert.FailNow(t, "message not received")
}

Basically, we have a component called energy-services that uses substrate and exposes functions to create a substrate.AsyncMessageSink etc.

In my example, I create a source starting from the newest offset and I want to create a sink, send a message and check this message was received within 3 seconds.

My problem is with the source.ConsumeMessages call. There is no way to synchronously know that the consumer is ready to listen for messages (as the time to create a consumer depends on many things and may take from few milliseconds to some seconds). So as a result, I am obliged to put a time.Sleep to wait for having a consumer created.

In my opinion, ConsumeMessages should return as soon as a consumer has been created. Then, to listen to the incoming messages, two different options:

  • Either ConsumeMessages triggers the consumption in another goroutine (to not block ConsumeMessages clients)
  • Or even this consumption is done in another function (and then up to the clients to do it within another goroutine or not)

Is this clearer?

teivah avatar May 29 '19 07:05 teivah