cppkafka icon indicating copy to clipboard operation
cppkafka copied to clipboard

producer.flush() ?

Open orwel1984 opened this issue 7 years ago • 11 comments

If you try the simple producer example on your first page without producer.flush():

#include <cppkafka/cppkafka.h>

using namespace std;
using namespace cppkafka;

int main() {
    // Create the config
    Configuration config = {
        { "metadata.broker.list", "127.0.0.1:9092" }
    };

    // Create the producer
    Producer producer(config);

    // Produce a message!
    string message = "hey there!";
    producer.produce(MessageBuilder("my_topic").partition(0).payload(message));
    // producer.flush();
}

And don't do producer.flush() in the end.
Then, Create one consumer and consume one message with group-id 0. And kill it. Now Create a 2nd consumer with group-id 0 and it will be few minutes before this one gets assigned.

Looks like it's very important to do producer.flush() but you haven't done it in your example codes elsewhere?

orwel1984 avatar Oct 19 '18 07:10 orwel1984

True, there should be some comment regarding flushing the producer. I'll add that now, thanks.

mfontanini avatar Oct 19 '18 15:10 mfontanini

What about sync and async producing? librdkafka has special topic about this: https://github.com/edenhill/librdkafka/wiki/Sync-producer

How sync producer should be implemented using cppkafka?

EvilBeaver avatar Oct 25 '18 15:10 EvilBeaver

You can use the BufferedProducer and use BufferedProducer::sync_produce. This does a similar thing to what the rdkafka wiki entry talks about under the hood.

Normally I always use this class because it has a smarter flushing mechanism than the Producer, which simply calls rd_kafka_flush.

mfontanini avatar Oct 25 '18 15:10 mfontanini

@mfontanini can you give a small example of how to use BufferedProducer and BufferedProducer::sync_produce() ?

orwel1984 avatar Oct 25 '18 21:10 orwel1984

Did you try reading the documentation? It's pretty self explanatory. See the constructor and BufferedProducer::sync_produce. There's also this example that which uses the buffered producer (although it does async production, but the changes are minimal).

mfontanini avatar Oct 25 '18 21:10 mfontanini

It seems that buffered producer is falling into endless loop, when there're connection timeouts.

https://github.com/mfontanini/cppkafka/blob/master/include/cppkafka/utils/buffered_producer.h#L600

Don't know why, but producer_.flush() in this code always times out, which leads to infinite loop. Why flush can return timeouts all the time?

EvilBeaver avatar Oct 29 '18 13:10 EvilBeaver

You can use wait_for_acks(timeout) so you don't block. See here.

accelerated avatar Oct 29 '18 14:10 accelerated

Yes, probably. But sync_produce doesn't use timeout'ed version of wait_for_acks, and I can't find the way to make it use wait_for_acks with timeout.

EvilBeaver avatar Oct 29 '18 14:10 EvilBeaver

Do you keep getting timeouts when flushing even though you shouldn't? I see timeouts when running tests against a local VM. I'm not sure why I get timeouts, given the connection shouldn't fail. I feel like there's some race condition in rdkafka that's triggered when you flush too quickly, probably before the handle connects to the brokers.

mfontanini avatar Oct 29 '18 14:10 mfontanini

I'm sending only one message via sync_produce into cloudkarafka.com. And it'seems something wrong with my network config or authentification. I expect that sync_produce will fail after some time, but it does not. Loop in https://github.com/mfontanini/cppkafka/blob/master/include/cppkafka/utils/buffered_producer.h#L600 is infinite.

EvilBeaver avatar Oct 29 '18 14:10 EvilBeaver

Yes, probably. But sync_produce doesn't use timeout'ed version of wait_for_acks, and I can't find the way to make it use wait_for_acks with timeout.

sync_produce() = produce() + wait_for_acks() (no timeout here means indefinite) therefore you can simulate with sync_produce(timeout) = produce() + wait_for_acks(timeout)

accelerated avatar Oct 29 '18 15:10 accelerated