Error trying to connect to kafka broker in confluent cloud "failed to dial: failed to open connection to <broker>.us-west-2.aws.confluent.cloud:9092 ... i/o timeout"
Hi, Hopefully this is an obvious issue but I could not figure this out. I get the following error when trying to connect to a kafka broker running in confluent cloud:
ERRO[0010] Failed to create dialer., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to xxx.us-west-2.aws.confluent.cloud:9092: dial tcp: lookup xxx.us-west-2.aws.confluent.cloud: i/o timeout 0xc0018b5ea0}) error="Failed to create dialer., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to xxx.us-west-2.aws.confluent.cloud:9092: dial tcp: lookup xxx.us-west-2.aws.confluent.cloud: i/o timeout 0xc0018b5ea0})"
Here is my simple script (at this point I simply want to make a connection).
import { check } from 'k6';
import { writer, produce, reader, consume, Connection, TLS_1_2 } from 'k6/x/kafka';
const saslConfig = {
username: "<username>",
password: "<password>",
algorithm: "plain"
};
const connection = new Connection({
address: "xxx.us-west-2.aws.confluent.cloud:9092",
sasl: saslConfig
});
export default function() {
// Fetch the list of all topics
const topics = connection.listTopics();
console.log(topics); // list of topics
}
Much appreciated.
Thanks @mostafa
I updated it to the following, and still with the same issue. I have the same config with our java-based application, which has no problem connecting btw.
Am I still missing something here (compared with the config in java app below)?
import { check } from 'k6';
import { writer, produce, reader, consume, Connection, TLS_1_2, SASL_PLAIN } from 'k6/x/kafka';
const saslConfig = {
username: "<username>",
password: "<password>",
algorithm: SASL_PLAIN
};
const tlsConfig = {
enableTLS: true,
insecureSkipTLSVerify: true,
minVersion: TLS_1_2,
};
const connection = new Connection({
address: "xxx.us-west-2.aws.confluent.cloud:9092",
sasl: saslConfig,
tls: tlsConfig
});
export default function() {
// Fetch the list of all topics
const topics = connection.listTopics();
console.log(topics); // list of topics
}
Here is the config on the java side:
security.protocol: SASL_SSL
basic.auth.credentials.source: USER_INFO
ssl.endpoint.identification.algorithm: https
sasl.mechanism: PLAIN
request.timeout.ms: 20000
retry.backoff.ms: 500
sasl.jaas.config: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
bootstrap.servers: xxx.us-west-2.aws.confluent.cloud:9092
I'm still having the issue connecting to confluent. Any help and assistance will be appreciated. Thank you! @mostafa
@Dwisf Here's what I did that worked for me:
- Created a trial Confluent Cloud account and in it, a basic cluster.
- Created a pair of API keys to use as SASL username and password.
- Created a topic manually, because I was getting a
POLICY_VIOLATIONerror when trying to create a topic using the script. I suppose this is unrelated to the tool and the script, and is rather a topic configuration issue that needs to be figured out and set while trying to create a topic.$ ./k6 run scripts/test_sasl_auth.js /\ |‾‾| /‾‾/ /‾‾/ /\ / \ | |/ / / / / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | / __________ \ |__| \__\ \_____/ .io ERRO[0001] Failed to create topic., OriginalError: %!w(kafka.Error=44) error="Failed to create topic., OriginalError: %!w(kafka.Error=44)" ERRO[0001] Failed to create topic., OriginalError: %!w(kafka.Error=44) at github.com/mostafa/xk6-kafka.(*Kafka).connectionClass.func1 (native) at file:///home/mostafa/personal/xk6-kafka/scripts/test_sasl_auth.js:99:27(136) at native hint="script exception" - Modified the
test_sasl_auth.jsslightly to include the cluster config:import { check } from "k6"; import { Writer, Reader, Connection, SchemaRegistry, SCHEMA_TYPE_JSON, SASL_PLAIN, TLS_1_2, } from "k6/x/kafka"; export const options = { scenarios: { sasl_auth: { executor: "constant-vus", vus: 1, duration: "10s", gracefulStop: "1s", }, }, }; const brokers = ["address.europe-west4.gcp.confluent.cloud:9092"]; const topic = "xk6_kafka_json_topic"; const saslConfig = { username: "sasl.username", password: "sasl.password", algorithm: SASL_PLAIN, }; const tlsConfig = { enableTls: true, insecureSkipTlsVerify: true, minVersion: TLS_1_2, }; const offset = 0; const partition = 0; const numPartitions = 1; const replicationFactor = 1; const writer = new Writer({ brokers: brokers, topic: topic, sasl: saslConfig, tls: tlsConfig, }); const reader = new Reader({ brokers: brokers, topic: topic, partition: partition, offset: offset, sasl: saslConfig, tls: tlsConfig, }); const connection = new Connection({ address: brokers[0], sasl: saslConfig, tls: tlsConfig, }); const schemaRegistry = new SchemaRegistry(); if (__VU == 0) { connection.createTopic({ topic: topic, numPartitions: numPartitions, replicationFactor: replicationFactor, }); console.log("Existing topics: ", connection.listTopics(saslConfig, tlsConfig)); } export default function () { for (let index = 0; index < 100; index++) { let messages = [ { key: schemaRegistry.serialize({ data: { correlationId: "test-id-abc-" + index, }, schemaType: SCHEMA_TYPE_JSON, }), value: schemaRegistry.serialize({ data: { name: "xk6-kafka", }, schemaType: SCHEMA_TYPE_JSON, }), }, { key: schemaRegistry.serialize({ data: { correlationId: "test-id-def-" + index, }, schemaType: SCHEMA_TYPE_JSON, }), value: schemaRegistry.serialize({ data: { name: "xk6-kafka", }, schemaType: SCHEMA_TYPE_JSON, }), }, ]; writer.produce({ messages: messages }); } let messages = reader.consume({ limit: 10 }); check(messages, { "10 messages returned": (msgs) => msgs.length == 10, "key is correct": (msgs) => schemaRegistry .deserialize({ data: msgs[0].key, schemaType: SCHEMA_TYPE_JSON }) .correlationId.startsWith("test-id-"), "value is correct": (msgs) => schemaRegistry.deserialize({ data: msgs[0].value, schemaType: SCHEMA_TYPE_JSON }) .name == "xk6-kafka", }); } export function teardown(data) { writer.close(); reader.close(); connection.close(); } - And this is the result:
./k6 run scripts/test_sasl_auth.js /\ |‾‾| /‾‾/ /‾‾/ /\ / \ | |/ / / / / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | / __________ \ |__| \__\ \_____/ .io INFO[0001] Existing topics: ["xk6_kafka_json_topic"] source=console execution: local script: scripts/test_sasl_auth.js output: - scenarios: (100.00%) 1 scenario, 1 max VUs, 11s max duration (incl. graceful stop): * sasl_auth: 1 looping VUs for 10s (gracefulStop: 1s) INFO[0002] Existing topics: ["xk6_kafka_json_topic"] source=console INFO[0012] Existing topics: ["xk6_kafka_json_topic"] source=console running (11.1s), 0/1 VUs, 120 complete and 0 interrupted iterations sasl_auth ✓ [======================================] 1 VUs 10s INFO[0013] Existing topics: ["xk6_kafka_json_topic"] source=console ✓ 10 messages returned ✓ key is correct ✓ value is correct █ teardown checks.........................: 100.00% ✓ 360 ✗ 0 data_received..................: 0 B 0 B/s data_sent......................: 0 B 0 B/s iteration_duration.............: avg=82.62ms min=144.06µs med=23.39ms max=1.19s p(90)=92.23ms p(95)=499.58ms iterations.....................: 120 10.809305/s kafka.reader.dial.count........: 1 0.090078/s kafka.reader.dial.seconds......: avg=3.4ms min=0s med=0s max=408.05ms p(90)=0s p(95)=0s kafka.reader.error.count.......: 0 0/s kafka.reader.fetch_bytes.max...: 1000000 min=1000000 max=1000000 kafka.reader.fetch_bytes.min...: 1 min=1 max=1 kafka.reader.fetch_wait.max....: 200ms min=200ms max=200ms kafka.reader.fetch.bytes.......: 124 kB 11 kB/s kafka.reader.fetch.size........: 1153 103.859409/s kafka.reader.fetches.count.....: 3 0.270233/s kafka.reader.lag...............: 18196 min=3 max=18235 kafka.reader.message.bytes.....: 65 kB 5.8 kB/s kafka.reader.message.count.....: 1201 108.183131/s kafka.reader.offset............: 1200 min=11 max=1200 kafka.reader.queue.capacity....: 1 min=1 max=1 kafka.reader.queue.length......: 1 min=1 max=1 kafka.reader.read.seconds......: avg=67.56ms min=0s med=0s max=7.63s p(90)=0s p(95)=0s kafka.reader.rebalance.count...: 0 0/s kafka.reader.timeouts.count....: 2 0.180155/s kafka.reader.wait.seconds......: avg=1.57ms min=0s med=0s max=69.24ms p(90)=0s p(95)=0s kafka.writer.acks.required.....: 0 min=0 max=0 kafka.writer.async.............: 0.00% ✓ 0 ✗ 12000 kafka.writer.attempts.max......: 0 min=0 max=0 kafka.writer.batch.bytes.......: 911 kB 82 kB/s kafka.writer.batch.max.........: 1 min=1 max=1 kafka.writer.batch.size........: 12000 1080.930536/s kafka.writer.batch.timeout.....: 0s min=0s max=0s kafka.writer.error.count.......: 0 0/s kafka.writer.message.bytes.....: 1.8 MB 164 kB/s kafka.writer.message.count.....: 24000 2161.861071/s kafka.writer.read.timeout......: 0s min=0s max=0s kafka.writer.retries.count.....: 0 0/s kafka.writer.wait.seconds......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s kafka.writer.write.count.......: 24000 2161.861071/s kafka.writer.write.seconds.....: avg=284.93µs min=10.99µs med=27.14µs max=434.11ms p(90)=62.27µs p(95)=74.09µs kafka.writer.write.timeout.....: 0s min=0s max=0s vus............................: 0 min=0 max=1 vus_max........................: 1 min=1 max=1 - And this is the topic dashboard after running the test:
@Dwisf Have you managed to make this work?
@mostafa thanks for checking in. I still can't successfully connect.
I created a test cluster in confluent cloud, created a test topic and produces some messages.
k6 still for some reason can't read from the cluster.
import { check } from 'k6';
import { Reader, Connection, TLS_1_2, SASL_PLAIN } from 'k6/x/kafka';
const brokers = ["xxx.us-west2.gcp.confluent.cloud:9092"];
const topic = "test_topic";
const saslConfig_TEST = {
username: "<XXX>",
password: "<XXX>",
algorithm: SASL_PLAIN
};
const tlsConfig = {
enableTLS: true,
insecureSkipTLSVerify: true,
minVersion: TLS_1_2,
};
const reader = new Reader({
brokers: brokers,
topic: topic,
groupID: "marketplace-contracts-prod-dwi-k6",
sasl: saslConfig_TEST,
tls: tlsConfig,
});
export const options = {
scenarios: {
test_kafka: {
executor: "constant-vus",
vus: 1,
duration: "15s",
gracefulStop: "1s",
exec: 'test_kafka_scenario',
},
},
};
export function test_kafka_scenario() {
let messages = reader.consume({ limit: 1 });
check(messages, {
"there are messages returned": (msgs) => msgs.length > 0,
});
}
export function teardown(data) {
reader.close();
}
Output:
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: ./k6_test_kafka.js
output: -
scenarios: (100.00%) 1 scenario, 1 max VUs, 16s max duration (incl. graceful stop):
* test_kafka: 1 looping VUs for 15s (exec: test_kafka_scenario, gracefulStop: 1s)
ERRO[0016] Unable to read messages. error="Unable to read messages."
running (35.8s), 0/1 VUs, 0 complete and 1 interrupted iterations
test_kafka ✓ [======================================] 1 VUs 15s
WARN[0036] No script iterations finished, consider making the test duration longer
█ teardown
data_received........: 0 B 0 B/s
data_sent............: 0 B 0 B/s
iteration_duration...: avg=19.81s min=19.81s med=19.81s max=19.81s p(90)=19.81s p(95)=19.81s
vus..................: 0 min=0 max=1
vus_max..............: 1 min=1 max=1
Do you think there is an issue with how I installed the extension? Is there a way to confirm that I have the correct installation?
@Dwisf
Set connectLogger to true on Reader config to see what's going on underneath. Alas, if you can produce messages, you should be able to consume them too.
@mostafa - thank you for your response.
I produce messages using different tool / script btw.
setting connectLogger produces the following output, seems to be the same issue:
scenarios: (100.00%) 1 scenario, 1 max VUs, 16s max duration (incl. graceful stop):
* test_kafka: 1 looping VUs for 15s (exec: test_kafka_scenario, gracefulStop: 1s)
INFO[0000] initializing kafka reader for partition 0 of test_topic starting at offset first offset
INFO[0010] kafka reader failed to read lag of partition 0 of test_topic: failed to dial: failed to open connection to pkc-123123.us-west2.gcp.confluent.cloud:9092: dial tcp: lookup pkc-123123.us-west2.gcp.confluent.cloud: i/o timeout 0 interrupted iterations
INFO[0010] error initializing the kafka reader for partition 0 of test_topic: failed to dial: failed to open connection to pkc-123123.us-west2.gcp.confluent.cloud:9092: dial tcp: lookup pkc-123123.us-west2.gcp.confluent.cloud: i/o timeout
INFO[0010] initializing kafka reader for partition 0 of test_topic starting at offset first offset
ERRO[0016] Unable to read messages. error="Unable to read messages."
....
I did this to install the extension:
go install go.k6.io/xk6/cmd/xk6@latest
xk6 build --with github.com/mostafa/xk6-kafka@latest
and run k6 as follows:
k6 run ./k6_test_kafka.js
@Dwisf There's an open issue on kafka-go project related to this specific error. See if there's anything there that can help you, like https://github.com/segmentio/kafka-go/issues/726#issuecomment-912127614:
- https://github.com/segmentio/kafka-go/issues/726
Thank you @mostafa - interesting that the timeout value (default to 10s) can be the issue. I'm looking forward to the fix being available. I wonder how I would incorporate the fix when it is available - do you have to patch xk6-kafka first?
For that, I need to update the dependency and make a new release., which I'll do quickly.
You can also test the changes locally using the replace directive in go.mod. For that, you need to have a copy of the kafka-go with the changes (change branch) and then replace the package URI with the path of kafka-go on your local machine.
@Dwisf The PR is merged. Can you clone the projects and test the code with your script?
@mostafa - unfortunately, looks like it's still not working. I wonder if someone else has the same problem that I'm facing currently, trying to connect to confluent cloud.
I cloned the kafka-go and xk6-kafka to my local, and built them (replacing reference to kafka-go, in go.mod, with the cloned)
replace github.com/segmentio/kafka-go => /tools/kafka-go
then use the built k6 to run the script.
Results:
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: ./k6_test_kafka.js
output: -
scenarios: (100.00%) 1 scenario, 1 max VUs, 1m6s max duration (incl. graceful stop):
* test_kafka: 1 looping VUs for 1m5s (exec: test_kafka_scenario, gracefulStop: 1s)
INFO[0000] initializing kafka reader for partition 0 of test_topic starting at offset first offset
INFO[0010] error initializing the kafka reader for partition 0 of test_topic: failed to dial: failed to open connection to xxx.us-west2.gcp.confluent.cloud:9092: dial tcp: lookup xxx.us-west2.gcp.confluent.cloud: i/o timeout interrupted iterations
INFO[0010] kafka reader failed to read lag of partition 0 of test_topic: failed to dial: failed to open connection to xxx.us-west2.gcp.confluent.cloud:9092: dial tcp: lookup xxx.us-west2.gcp.confluent.cloud: i/o timeout
INFO[0010] initializing kafka reader for partition 0 of test_topic starting at offset first offset
INFO[0016] error initializing the kafka reader for partition 0 of test_topic: could not successfully authenticate to xxx.us-west2.gcp.confluent.cloud:9092 with SASL: [58] SASL Authentication Failed: SASL Authentication failed and 0 interrupted iterations
INFO[0016] initializing kafka reader for partition 0 of test_topic starting at offset first offset
INFO[0026] error initializing the kafka reader for partition 0 of test_topic: failed to dial: failed to open connection to xxx.us-west2.gcp.confluent.cloud:9092: dial tcp: lookup xxx.us-west2.gcp.confluent.cloud: i/o timeout interrupted iterations
INFO[0027] initializing kafka reader for partition 0 of test_topic starting at offset first offset
ERRO[0037] Unable to read messages. error="Unable to read messages."
INFO[0037] Message length: 0 source=console
INFO[0037] [] source=console
INFO[0038] initializing kafka reader for partition 0 of test_topic starting at offset first offset
ERRO[0048] Unable to read messages. error="Unable to read messages."
INFO[0048] Message length: 0 source=console
INFO[0048] [] source=console
INFO[0049] initializing kafka reader for partition 0 of test_topic starting at offset first offset
ERRO[0059] Unable to read messages. error="Unable to read messages."
INFO[0059] Message length: 0 source=console
INFO[0059] [] source=console
INFO[0060] initializing kafka reader for partition 0 of test_topic starting at offset first offset
ERRO[0066] Unable to read messages. error="Unable to read messages."
running (1m06.0s), 0/1 VUs, 3 complete and 1 interrupted iterations
test_kafka ✓ [======================================] 1 VUs 1m5s
✗ there are messages returned
↳ 0% — ✓ 0 / ✗ 3
█ teardown
checks.........................: 0.00% ✓ 0 ✗ 3
data_received..................: 0 B 0 B/s
data_sent......................: 0 B 0 B/s
iteration_duration.............: avg=14.78s min=3.6µs med=11s max=37.13s p(90)=29.29s p(95)=33.21s
iterations.....................: 3 0.045452/s
kafka.reader.dial.count........: 6 0.090904/s
kafka.reader.dial.seconds......: avg=9.64s min=8.93s med=10s max=10s p(90)=10s p(95)=10s
kafka.reader.error.count.......: 4 0.060603/s
kafka.reader.fetch_bytes.max...: 1000000 min=1000000 max=1000000
kafka.reader.fetch_bytes.min...: 1 min=1 max=1
kafka.reader.fetch_wait.max....: 200ms min=200ms max=200ms
kafka.reader.fetch.bytes.......: 0 B 0 B/s
kafka.reader.fetch.size........: 0 0/s
kafka.reader.fetches.count.....: 0 0/s
kafka.reader.lag...............: 0 min=0 max=0
kafka.reader.message.bytes.....: 0 B 0 B/s
kafka.reader.message.count.....: 0 0/s
kafka.reader.offset............: 0 min=0 max=0
kafka.reader.queue.capacity....: 1 min=1 max=1
kafka.reader.queue.length......: 0 min=0 max=0
kafka.reader.read.seconds......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka.reader.rebalance.count...: 0 0/s
kafka.reader.timeouts.count....: 0 0/s
kafka.reader.wait.seconds......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
vus............................: 1 min=1 max=1
vus_max........................: 1 min=1 max=1
@Dwisf
I created this branch for your testings. Try to play with the readBatchTimeout and the maxWait in the reader config object. I just saw that the maxWait is set to 200ms, which is clearly very low.
@Dwisf I'll close this ticket due to inactivity. Feel free to re-open it if the issue persists.
@mostafa time="2024-04-08T17:20:24+05:30" level=error msg="Failed to create dialer., OriginalError: failed to dial: failed to open connection to localhost:9092: dial tcp [::1]:9092: connectex: No connection could be made because the target machine actively refused it." error="Failed to create dialer., OriginalError: failed to dial: failed to open connection to localhost:9092: dial tcp [::1]:9092: connectex: No connection could be made because the target machine actively refused it." ERRO[0000] GoError: Failed to create dialer., OriginalError: failed to dial: failed to open connection to localhost:9092: dial tcp [::1]:9092: connectex: No connection could be made because the target machine actively refused it. at file:///C:/kafka/k6-kafka/kafka.js:6:19(31) hint="script exception" after i get this error in this code import { Connection } from "k6/x/kafka"; // import kafka extension
const address = "localhost:9092"; const topic = "xk6_kafka_test_topic";
const connection = new Connection({ address: address, });
const results = connection.listTopics(); connection.createTopic({ topic: topic });
export default function () { results.forEach((topic) => console.log(topic)); }
export function teardown(data) { if (__VU == 0) { // Delete the topic connection.deleteTopic(topic); } connection.close(); }
Hey @Madhankumar11,
I won't be debugging your script and issues that are not directly related to this project. The scripts work perfectly fine, which means that you should spend more time debugging what you wrote, rather than posting them here to be resolved.