substrate
substrate copied to clipboard
Kafka consumption problem
Hello (from a UW newcomer),
I do like substrate but there's something I really puzzled with.
Let's say I have the following use case (for integration testing or whatever):
- I want to create a Kafka source with
kafka.OffsetNewest
- Once the consumer is ready to consume messages, I want to send a message and check whether it's been received by the consumer
Nothing really fancy isn't it? Yet, with the current implementation, I can't do it properly without having to use some random time.sleep
.
The problem, in my humble opinion, is that we can't know when the Kafka client has been created (the call to cluster.NewConsumerFromClient
). We should also bear in mind that this call might take some time in case for example of a rebalancing.
For me, ConsumeMessages(ctx context.Context, messages chan<- substrate.Message, acks <-chan substrate.Message) error
should be designed this way:
- The creation of the Kafka client is done synchronously in the current goroutine. If there is an error we return it.
- If this part was ok, we create in another goroutine the logic consumes the Kafka messages and pass them to the client. Meanwhile, we return a nil error
As a user of substrate, I would then know whenever my consumer is ready. Then, up to me to do it in another goroutine if I want, synchronize it with a WaitGroup or whatever.
What do you guys think?
I normally solve this problem by timing out on a select statement that watches the timer and the message consuming piece. If it takes longer than X it'll be considered a failed test, but also completes immediately when the message is consumed. Now there is always the possibility that X wasn't large enough, but such are the joys of integration testing.
I started to reply, but I realised I don't fully understand the case.
Do you have example (annotated) code that shows the problem? Or a failing unit test to show the problem? @teivah
@mjgarton Sure:
var input msg = []byte(strconv.Itoa(random.Intn(10000)))
ctxCancel, cancel := context.WithCancel(context.Background())
defer cancel()
source, err := broker.NewAsyncKafkaSource(brokers, topic, "cg", version, kafka.OffsetNewest)
if err != nil {
assert.FailNow(t, err.Error())
}
sourceAck := make(chan substrate.Message, 1)
sourceMsg := make(chan substrate.Message, 1)
go func() {
if err := source.ConsumeMessages(ctxCancel, sourceMsg, sourceAck); err != nil {
assert.FailNow(t, err.Error())
}
}()
time.Sleep(10000 * time.Millisecond)
sink, err := broker.NewAsyncKafkaSink(brokers, topic, version)
if err != nil {
assert.FailNow(t, err.Error())
}
sinkAck := make(chan substrate.Message, 1)
sinkMsg := make(chan substrate.Message, 1)
sinkMsg <- input
go func() {
if err := sink.PublishMessages(ctxCancel, sinkAck, sinkMsg); err != nil {
assert.FailNow(t, err.Error())
}
}()
ctxTimeout, _ := context.WithTimeout(context.Background(), 3000*time.Millisecond)
select {
case m := <-sourceMsg:
assert.Equal(t, input.Data(), m.Data())
case <-ctxTimeout.Done():
assert.FailNow(t, "message not received")
}
Basically, we have a component called energy-services that uses substrate and exposes functions to create a substrate.AsyncMessageSink
etc.
In my example, I create a source starting from the newest offset and I want to create a sink, send a message and check this message was received within 3 seconds.
My problem is with the source.ConsumeMessages
call. There is no way to synchronously know that the consumer is ready to listen for messages (as the time to create a consumer depends on many things and may take from few milliseconds to some seconds).
So as a result, I am obliged to put a time.Sleep
to wait for having a consumer created.
In my opinion, ConsumeMessages
should return as soon as a consumer has been created. Then, to listen to the incoming messages, two different options:
- Either
ConsumeMessages
triggers the consumption in another goroutine (to not blockConsumeMessages
clients) - Or even this consumption is done in another function (and then up to the clients to do it within another goroutine or not)
Is this clearer?