kafkajs
kafkajs copied to clipboard
v1.16.0 - Lossing data when one node of the cluster is lost
Describe the bug I open this issue to clarify if what I see is correct and must be solve by the user in some way, or is a bug. The specific configuration that i have tested:
- 3 nodes of Kafka v3.0 based on
confluentinc/cp-kafka
docker images. All of them with this configuration:
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
- A single producer that try to write 60 records every 5 seconds with
send
method. The configuration of this producer is:- Client:
{ clientId: 'creator', brokers: ['localhost:9092', 'localhost:9093', 'localhost:9093'], enforceRequestTimeout: true, requestTimeout: 3000, connectionTimeout: 3000, }
- Producer:
{ maxInFlightRequests: 1, transactionalId: 'my-transactional-producer', idempotent: true, transactionTimeout: 3000, retry: { retries: Number.MAX_SAFE_INTEGER }, }
I log every job (the result of a send process) and in the normal operation I can see something like:
The size of the test is: 60
{"level":"ERROR","timestamp":"2022-02-28T09:28:59.133Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"localhost:9092","clientId":"creator","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":2,"size":123}
[
{
"topicName": "myTopic",
"partition": 0,
"errorCode": 0,
"baseOffset": "0",
"logAppendTime": "-1",
"logStartOffset": "0"
},
{
"topicName": "myTopic",
"partition": 1,
"errorCode": 0,
"baseOffset": "0",
"logAppendTime": "-1",
"logStartOffset": "0"
},
{
"topicName": "myTopic",
"partition": 2,
"errorCode": 0,
"baseOffset": "0",
"logAppendTime": "-1",
"logStartOffset": "0"
}
]
OPERATION 1 - 60/600
dbsave: 1.090s
The size of the test is: 60
[
{
"topicName": "myTopic",
"partition": 0,
"errorCode": 0,
"baseOffset": "20",
"logAppendTime": "-1",
"logStartOffset": "0"
},
{
"topicName": "myTopic",
"partition": 1,
"errorCode": 0,
"baseOffset": "20",
"logAppendTime": "-1",
"logStartOffset": "0"
},
{
"topicName": "myTopic",
"partition": 2,
"errorCode": 0,
"baseOffset": "20",
"logAppendTime": "-1",
"logStartOffset": "0"
}
]
OPERATION 2 - 120/600
dbsave: 25.286ms
In the middle of the test I stop one of the servers (not the controller of the cluster) with docker stop 85fsa...
, and in the most of cases, not always, this is the log that i receive:
The size of the test is: 60
{"level":"ERROR","timestamp":"2022-02-28T09:29:09.132Z","logger":"kafkajs","message":"[Producer] Failed to send messages: Closed connection","retryCount":0,"retryTime":243}
[
{
"topicName": "myTopic",
"partition": 0,
"errorCode": 0,
"baseOffset": "40",
"logAppendTime": "-1",
"logStartOffset": "0"
},
{
"topicName": "myTopic",
"partition": 2,
"errorCode": 0,
"baseOffset": "40",
"logAppendTime": "-1",
"logStartOffset": "0"
}
]
OPERATION 3 - 180/600
dbsave: 1.262s
The size of the test is: 60
[
{
"topicName": "myTopic",
"partition": 0,
"errorCode": 0,
"baseOffset": "60",
"logAppendTime": "-1",
"logStartOffset": "0"
},
{
"topicName": "myTopic",
"partition": 1,
"errorCode": 0,
"baseOffset": "40",
"logAppendTime": "-1",
"logStartOffset": "0"
},
{
"topicName": "myTopic",
"partition": 2,
"errorCode": 0,
"baseOffset": "60",
"logAppendTime": "-1",
"logStartOffset": "0"
}
]
OPERATION 4 - 240/600
dbsave: 9.988ms
As you can see the send to one of the servers failed, but the send
method resolved with the response of only 2 server, and the send its not retried.
In the last job process you can see that the final result is that we have lost 20 messages without warning from the library:
The size of the test is: 60
[
{
"topicName": "myTopic",
"partition": 0,
"errorCode": 0,
"baseOffset": "180",
"logAppendTime": "-1",
"logStartOffset": "0"
},
{
"topicName": "myTopic",
"partition": 1,
"errorCode": 0,
"baseOffset": "160",
"logAppendTime": "-1",
"logStartOffset": "0"
},
{
"topicName": "myTopic",
"partition": 2,
"errorCode": 0,
"baseOffset": "180",
"logAppendTime": "-1",
"logStartOffset": "0"
}
]
OPERATION 10 - 600/600
dbsave: 7.462ms
The snipped of code that I use to reproduce this behaviour is:
import { Kafka, Message } from 'kafkajs';
import fs from 'fs';
var access = fs.createWriteStream('test.log');
//@ts-ignore ignore is a test
process.stdout.write = process.stderr.write = access.write.bind(access);
function createMessages(from: number, to: number): Message[] {
const messages: Message[] = [];
for (let index = from; index < to; index += 1) {
messages.push({ value: JSON.stringify({ index }) });
}
return messages;
}
const client = new Kafka({
clientId: 'creator',
brokers: ['localhost:9092', 'localhost:9093', 'localhost:9093'],
enforceRequestTimeout: true,
requestTimeout: 3000,
connectionTimeout: 3000,
});
const producer = client.producer({
maxInFlightRequests: 1,
transactionalId: 'my-transactional-producer',
idempotent: true,
transactionTimeout: 3000,
retry: { retries: Number.MAX_SAFE_INTEGER },
});
const consumer = client.consumer({ groupId: 'myGroup' });
(async () => {
await producer.connect();
await consumer.connect();
})();
let pending = false;
let consuming = false;
let count = 0;
let aggregatedValue = 0;
let messagesCount = 0;
const messageBatchSize = 60;
const messageFinalAmount = 60 * 10;
const expectedResult = (messageFinalAmount * (messageFinalAmount - 1)) / 2;
console.log(`The final result should be ${expectedResult}`);
setInterval(async () => {
if (count === messageFinalAmount / messageBatchSize && !consuming) {
await consumer.subscribe({ topic: 'myTopic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }) => {
if (!message.value) {
throw new Error(`TEST IS BROKEN`);
}
const value = JSON.parse(message.value.toString()).index;
aggregatedValue = aggregatedValue + value;
console.log(
`${value} - ${aggregatedValue} - ${
aggregatedValue === expectedResult ? 'ALL MESSAGE HAS BEEN RECEIVED' : 'PENDING ...'
}`
);
},
});
consuming = true;
return;
}
if (pending || consuming) {
return;
}
const messages = createMessages(messagesCount, messagesCount + messageBatchSize);
console.log(`The size of the test is: ${messages.length}`);
console.time('dbsave');
pending = true;
try {
const result = await producer.send({ topic: 'myTopic', acks: -1, messages, timeout: 3000 });
console.log(JSON.stringify(result, null, 2));
count++;
messagesCount += messageBatchSize;
console.log(`OPERATION ${count} - ${messagesCount}/${messageFinalAmount}`);
console.timeEnd('dbsave');
pending = false;
} catch (error) {
console.log(`ERROR IN OPERATION ${error.message}`);
console.timeEnd('dbsave');
pending = false;
}
}, 5000);
And the docker-compose file:
version: '3.8'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
ports:
- '32181:32181'
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
kafka-1:
image: confluentinc/cp-kafka:latest
ports:
- '9092:9092'
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
kafka-2:
image: confluentinc/cp-kafka:latest
ports:
- '9093:9093'
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
kafka-3:
image: confluentinc/cp-kafka:latest
ports:
- '9094:9094'
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
kowl:
image: quay.io/cloudhut/kowl:master
depends_on:
- zookeeper-1
- kafka-1
- kafka-2
- kafka-3
environment:
KAFKA_BROKERS: kafka-1:29092,kafka-2:29093, kafka-3:29094
ports:
- "8080:8080"
Expected behavior I expect the method rejects with an error in order to retry the operations by my shelve or try to find the server that its the responsible of the partition now.
Observed behavior
Even one of the servers fails in the middle of the process of sending new records to the cluster, the send
method resolves without error and the data is lost.
Environment:
- OS: Linux Ubuntu 20.04
- KafkaJS version: 1.16.0 and 1.15.0
- Kafka version: 3.0
- NodeJS version: 14.18.2
Hi there,
I've debugged a little bit more the issue, I don't know the code enough well, but I think I have found where is the problem.
In the sendMessages file, the responses of the brokers is tracked by the Map
responsePerBroker
.
https://github.com/tulios/kafkajs/blob/1876abcb5effd6183c1fbaa50dee9cef269b67a5/src/producer/sendMessages.js#L20-L26
If the request is fulfilled, the response is setted, but if there is a failure in the response, the response from the broker is deleted.
https://github.com/tulios/kafkajs/blob/1876abcb5effd6183c1fbaa50dee9cef269b67a5/src/producer/sendMessages.js#L116-L121
This logic is okey when you have a problem in the request but the broker is still there. For that cases where the broker is lost (from 3 brokers in a cluster to 2), in the next retry of the createProducerRequests
no request will be performed, due too right now we have only 2 brokers and both of them has an answer in the map responsePerBroker
(brokersWithoutResponse
will be empty).
https://github.com/tulios/kafkajs/blob/1876abcb5effd6183c1fbaa50dee9cef269b67a5/src/producer/sendMessages.js#L65-L69
So, the Promise.all()
will be resolved without any code execution and the sendMessages
function will return the result of the previous iteration.
https://github.com/tulios/kafkajs/blob/1876abcb5effd6183c1fbaa50dee9cef269b67a5/src/producer/sendMessages.js#L132-L136
I think this logic should be based in partitiions instead of brokers.
Let me know your thoughts
This is clearly a bug.
I think this logic should be based in partitiions instead of brokers.
So if your analysis is correct, then I suppose what's needed is to during retries detect that we still have outstanding partitions to produce to, check who the new leader for that partition is and then issue new requests.
I'm not sure about the cluster behavior during this operation though. If it's a graceful shutdown of a broker in the cluster, I would expect leadership to change to one of the other brokers, in which case the above approach should work, but if it's an ungraceful shutdown I'm not sure that this will make a difference because the automatic leadership rebalance only kicks in after 5 minutes by default. Regardless, at least we should be able to throw a meaningful error in the case we can't find a leader for the partition, rather than silently fail as is the case currently.
Hi @Nevon,
As you say there will be a complete different behaviour depending on the server "shutdown style". A possible solution is to change the resolve response of the send function adding a list the messages that were commited and not, in that way the user can retry to send the uncommited messages by himself. In the second try, if the cluster is able to receive the messages (even with one server less), you will be able to send all the messages or receive a new error with a clear message of the actual problem.
This is the way that some SDKs use to solve this kind of issues.
For example this is the response of AWS SDK for Kinesis Client or Firehose Client:
{
"FailedRecordCount": 2,
"Records": [
{
"SequenceNumber": "49543463076548007577105092703039560359975228518395012686",
"ShardId": "shardId-000000000000"
},
{
"ErrorCode": "ProvisionedThroughputExceededException",
"ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
},
{
"ErrorCode": "InternalFailure",
"ErrorMessage": "Internal service failure."
}
]
}
Of course we must the select the correct format to be sure that we don't broke any previous implementation.
I think if we have any failures that we couldn't handle, we should reject producer.send
. We have similar situations with other operations that do multiple things towards multiple brokers, where the operation can partially fail. For example, admin.createTopics
can succeed for one topic and fail for another. #1104 is a good example of how we handle that. I think we could do something similar for this.
If the producer.send
is rejected, the user will try to retry the complete "job", so in the most of the cases some messages will be commited twice, something that it's not the ideal situation.
Check it out, this issue has come up before #43. Let's use that instead.
Actually, nevermind, this deserves its own issue. Part of it is to communicate the partial failure better to the user, but another part is handling the error to begin with. Realized this one millisecond after closing.
As extended error classes are currently be using in the code, we have to decide if the correct approach is to resolve with more info or maybe, reject with a special error.
At the same time, we should use Promise.allSettled()
instead of Promise.all()
, to be sure that we control which messages were commited, here we need to take into account that Nodejs 12.9.0
will be necessary.
There is a all settled() implementation in /utils.
On Thu, 10 Mar 2022, 23:09 Carlos Jiménez Saiz, @.***> wrote:
As extended error classes are currently be using in the code, we have to decide if the correct approach is to resolve with more info or maybe, reject with a special error.
At the same time, we should use Promise.allSettled()instead of Promise.all(), to be sure that we control which messages were commited, here we need to take into account that Nodejs 12.9.0 will be necessary.
— Reply to this email directly, view it on GitHub https://github.com/tulios/kafkajs/issues/1303#issuecomment-1064164351, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABDLW5T2IF327FH7TXLIB7LU7IGD7ANCNFSM5PQWJ33A . You are receiving this because you are subscribed to this thread.Message ID: @.***>
Hi, We also faced this problem, and it is critical for us. I would like to fix it if you don't mind.
Hi, no problem from my side, I have sudden rush of work and I have deferred this task with no forecast date
Sorry, any movement on this? Sounds like something that might happen on any production system that uses this library
So basically this issue means that once a node is down (like when MSK clusters go through security patches, which are frequent) we may end up losing messages?
@eladchen Yes