Broker is unavailable under devcontainer
Describe the bug
Hello, I am trying to have a test with Apache Kafka under devcontainers.
I am using use testcontainers_modules::kafka::apache::Kafka;
I am running the same test provided here, but for some reason I get following error:
called Result::unwrap() on an Err value: (KafkaError (Message production error: MessageTimeOut (Local: Message timed out)), OwnedMessage { payload: Some([77, 101, 115, ...]), key: Some([75, 101, 121, 32, 48]), topic: "test-topic", timestamp: NotAvailable, partition: -1, offset: -1001, headers: None })
I wonder if it is related to devcontainer and instead of 127.0.0.1 I have to use another ip address?
To Reproduce
No response
Expected behavior
No response
Hi 👋
Perhaps, you need to use get_host instead of hardcoded localhost. Which is appropriate when container isn't running locally, but remote
Could you try? And if it doesn't help, could you enrich the context with your setup, please
Hi @DDtKey
Yes, I was trying to do it as well, but it doesn't work for me. Here is what I am trying to do:
let kafka_node = Kafka::default().with_jvm_image().start().await.unwrap();
let bootstrap_servers = format!(
"{}:{}",
kafka_node.get_host().await.unwrap(),
kafka_node.get_host_port_ipv4(KAFKA_PORT).await.unwrap()
);
let producer = ClientConfig::new()
.set("bootstrap.servers", &bootstrap_servers)
.set("message.timeout.ms", "5000")
.create::<FutureProducer>()
.expect("Failed to create Kafka FutureProducer");
let consumer = ClientConfig::new()
.set("group.id", "testcontainer-rs")
.set("bootstrap.servers", &bootstrap_servers)
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.create::<StreamConsumer>()
.expect("Failed to create Kafka StreamConsumer");
let topic = "test-topic";
let number_of_messages_to_produce = 5_usize;
let expected: Vec<String> = (0..number_of_messages_to_produce)
.map(|i| format!("Message {i}"))
.collect();
for (i, message) in expected.iter().enumerate() {
producer
.send(
FutureRecord::to(topic)
.payload(message)
.key(&format!("Key {i}")),
Duration::from_secs(0),
)
.await
.unwrap();
}
consumer
.subscribe(&[topic])
.expect("Failed to subscribe to a topic");
let mut message_stream = consumer.stream();
for produced in expected {
let borrowed_message =
tokio::time::timeout(Duration::from_secs(10), message_stream.next())
.await
.unwrap()
.unwrap();
assert_eq!(
produced,
borrowed_message
.unwrap()
.payload_view::<str>()
.unwrap()
.unwrap()
);
}
This is the error that I am getting:
called `Result::unwrap()` on an `Err` value: (KafkaError (Message production error: MessageTimedOut (Local: Message timed out)), OwnedMessage { payload: Some([77, 101, 115, 115, 97, 103, 101, 32, 48]), key: Some([75, 101, 121, 32, 48]), topic: "test-topic", timestamp: NotAvailable, partition: 0, offset: -1001, headers: None })
and this is what I can see from the logs:
ERROR librdkafka > librdkafka: FAIL [thrd:127.0.0.1:55617/1]: 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT)
ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT)
ERROR librdkafka > librdkafka: FAIL [thrd:127.0.0.1:55617/1]: 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
Did you customize DOCKER_HOST or anything else? Could you elaborate on your docker setup?
Also you always can use export TESTCONTAINERS_COMMAND=keep and double-check the container with docker inspect or check logs of the container after test-run
I am using .devcontainer
It basically means only one thing - DinD, but doesn't provide info about possible customization/configuration.
I can only assume it's some reasonable defaults and docker.sock is mapped properly without any overridden behavior, because otherwise container wouldn't even start successfully.
Here is the fail happen on connection step. So it's most likely related to proper connection url (IP) 🤔
Interesting thing that if I do command in container directly by using kafka_node.exec and changing bootstrap_servers to
let bootstrap_servers = format!(
"{}:{}",
"host.docker.internal",
kafka_node.get_host_port_ipv4(KAFKA_PORT).await.unwrap()
);
it works just fine.
@DDtKey you said that doesn't provide info about possible customization/configuration. What do you mean?
I meant that it's unclear which container runtime is used, how it's configured and if devcontainer has any customization over docker socker/host. But generally, since container is started and working - it's less important. Better to focus on network settings
In .devcontainer I do have following:
"mounts": [
"source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind"
]
I think the problem is definitely with network settings. Just need to figure out how to solve it...
Generally, I guess the issue is that container is available on your host machine. 127.0.0.1 inside devcontainer refers to itself, but socket is used from host system.
Did you try to use host.docker.internal? Because in that case you will be accessing host machine
Do you mean in env variables? or you mean you it as host for bootstrap_servers?
When I am using host.docker.internal as host for bootstrap_servers, I start to get following error:
ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): host.docker.internal:55809/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
ERROR rdkafka::client > librdkafka: Global error: AllBrokersDown (Local: All broker connections are down): 1/1 brokers are down
ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): host.docker.internal:55809/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
ERROR librdkafka > librdkafka: FAIL [thrd:host.docker.internal:55809/bootstrap]: host.docker.internal:55809/bootstrap: Connect to ipv4#192.168.65.254:55809 failed: Connection refused (after 6ms in state CONNECT)
ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): host.docker.internal:55809/bootstrap: Connect to ipv4#192.168.65.254:55809 failed: Connection refused (after 6ms in state CONNECT)
I also tried different cases with
kafka_node.get_host_port_ipv4(testcontainers::core::ContainerPort::Tcp(9092)).await.unwrap() and kafka_node.get_host_port_ipv4(testcontainers::core::ContainerPort::Tcp(9094)).await.unwrap()
Just as an option, you can try to specify some bridge network for your devcontainer and then pass the same network to testcontainer. After which you can use get_bridge_ip_address 🤔