producer.flush() ?
If you try the simple producer example on your first page without producer.flush():
#include <cppkafka/cppkafka.h>
using namespace std;
using namespace cppkafka;
int main() {
// Create the config
Configuration config = {
{ "metadata.broker.list", "127.0.0.1:9092" }
};
// Create the producer
Producer producer(config);
// Produce a message!
string message = "hey there!";
producer.produce(MessageBuilder("my_topic").partition(0).payload(message));
// producer.flush();
}
And don't do producer.flush() in the end.
Then, Create one consumer and consume one message with group-id 0. And kill it.
Now Create a 2nd consumer with group-id 0 and it will be few minutes before this one gets assigned.
Looks like it's very important to do producer.flush() but you haven't done it in your example codes elsewhere?
True, there should be some comment regarding flushing the producer. I'll add that now, thanks.
What about sync and async producing? librdkafka has special topic about this: https://github.com/edenhill/librdkafka/wiki/Sync-producer
How sync producer should be implemented using cppkafka?
You can use the BufferedProducer and use BufferedProducer::sync_produce. This does a similar thing to what the rdkafka wiki entry talks about under the hood.
Normally I always use this class because it has a smarter flushing mechanism than the Producer, which simply calls rd_kafka_flush.
@mfontanini can you give a small example of how to use BufferedProducer and BufferedProducer::sync_produce() ?
Did you try reading the documentation? It's pretty self explanatory. See the constructor and BufferedProducer::sync_produce. There's also this example that which uses the buffered producer (although it does async production, but the changes are minimal).
It seems that buffered producer is falling into endless loop, when there're connection timeouts.
https://github.com/mfontanini/cppkafka/blob/master/include/cppkafka/utils/buffered_producer.h#L600
Don't know why, but producer_.flush() in this code always times out, which leads to infinite loop. Why flush can return timeouts all the time?
You can use wait_for_acks(timeout) so you don't block. See here.
Yes, probably. But sync_produce doesn't use timeout'ed version of wait_for_acks, and I can't find the way to make it use wait_for_acks with timeout.
Do you keep getting timeouts when flushing even though you shouldn't? I see timeouts when running tests against a local VM. I'm not sure why I get timeouts, given the connection shouldn't fail. I feel like there's some race condition in rdkafka that's triggered when you flush too quickly, probably before the handle connects to the brokers.
I'm sending only one message via sync_produce into cloudkarafka.com. And it'seems something wrong with my network config or authentification. I expect that sync_produce will fail after some time, but it does not. Loop in https://github.com/mfontanini/cppkafka/blob/master/include/cppkafka/utils/buffered_producer.h#L600 is infinite.
Yes, probably. But sync_produce doesn't use timeout'ed version of wait_for_acks, and I can't find the way to make it use wait_for_acks with timeout.
sync_produce() = produce() + wait_for_acks() (no timeout here means indefinite)
therefore you can simulate with
sync_produce(timeout) = produce() + wait_for_acks(timeout)