Unexpected behaviour when using multiple async producers
Description
When trying to send multiple messages asynchronously and using more than one Producer to do so, incorrect messages are send to the kafka broker. More specificly instead of sending the latest message the previous message is send.
After testing I observe that Producer's callback function is called more than once. Moreover the old callback function is called in place of the newly defined one.
library versions tested on: master-a8f5b0, 0.2.0.6, 0.2.0.8
Reproduce steps
running the following script:
function sendMsg ($msg) {
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(14000);
$config->setMetadataBrokerList('localhost:9092');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$count = 0;
$producer = new \Kafka\Producer(
function() use ($msg, &$count) {
echo "$msg $count \n";
$count += 1;
return [
[
'topic' => 'test',
'value' => 'test....message.',
'key' => 'testkey',
],
];
}
);
$producer->success(function($result) {
echo " success \n";
//var_dump($result);
});
$producer->error(function($errorCode) use ($producer) {
echo "ERROR: \n";
var_dump($errorCode);
});
$producer->send(true);
}
for ($i = 0; $i < 10; $i += 1) {
sendMsg("msg-$i");
}
Expected result:
msg-0 0
msg-1 0
msg-2 0
msg-3 0
msg-4 0
msg-5 0
msg-6 0
. . .
Actual result:
msg-0 0
msg-0 1
msg-0 2
msg-0 3
msg-0 4
msg-5 0
msg-6 0
msg-6 1
. . .
Note: the output is not consistent, but is always reproducible.
@lvrach wow that's really weird, I'd assume this is caused by sharing singletons and nature of "async" stuff... the library was not designed to have the producer within a foreach loop and this is a result of that.
This is a nasty bug but is a side-effect of the design decisions here =/ I'm trying to improve the connection management and solve this once and for all but am kind of overwhelmed with all the stuff I have now 😞
@lcobucci thank you for your quick response.
I simply want to send multiple messages to Kafka asynchronously. Could you recommend a better way to do this, utilising this library?
@lvrach I'd recommend you to put the logic in the callback, the way we have in the functional tests.
We could do some minor changes to allow you to use a generator but for now you do need to return an array on this callback.
So your snippet would look like this:
function sendMsgs(callable $messages) {
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(14000);
$config->setMetadataBrokerList('localhost:9092');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$count = 0;
$producer = new \Kafka\Producer($messages);
$producer->success(function($result) {
echo " success \n";
//var_dump($result);
});
$producer->error(function($errorCode) use ($producer) {
echo "ERROR: \n";
var_dump($errorCode);
});
$producer->send(true);
}
$messageCreation = function(): array {
$messages = [];
for ($i = 0; $i < 10; ++$i) {
$messages[] = [
'topic' => 'test',
'value' => 'test....message.',
'key' => 'testkey',
];
echo "msg-$i \n";
}
return $messages;
}
sendMsgs($messageCreation);
You can use any callable syntax to pass the function that generates the messages to be sent.