testcontainers-rs-modules-community icon indicating copy to clipboard operation
testcontainers-rs-modules-community copied to clipboard

Broker is unavailable under devcontainer

Open rkhudov opened this issue 11 months ago • 12 comments

Describe the bug

Hello, I am trying to have a test with Apache Kafka under devcontainers. I am using use testcontainers_modules::kafka::apache::Kafka; I am running the same test provided here, but for some reason I get following error:

called Result::unwrap() on an Err value: (KafkaError (Message production error: MessageTimeOut (Local: Message timed out)), OwnedMessage { payload: Some([77, 101, 115, ...]), key: Some([75, 101, 121, 32, 48]), topic: "test-topic", timestamp: NotAvailable, partition: -1, offset: -1001, headers: None })

I wonder if it is related to devcontainer and instead of 127.0.0.1 I have to use another ip address?

To Reproduce

No response

Expected behavior

No response

rkhudov avatar Jan 07 '25 16:01 rkhudov

Hi 👋

Perhaps, you need to use get_host instead of hardcoded localhost. Which is appropriate when container isn't running locally, but remote

Could you try? And if it doesn't help, could you enrich the context with your setup, please

DDtKey avatar Jan 07 '25 18:01 DDtKey

Hi @DDtKey

Yes, I was trying to do it as well, but it doesn't work for me. Here is what I am trying to do:

let kafka_node = Kafka::default().with_jvm_image().start().await.unwrap();

    let bootstrap_servers = format!(
        "{}:{}",
        kafka_node.get_host().await.unwrap(),
        kafka_node.get_host_port_ipv4(KAFKA_PORT).await.unwrap()
    );

    let producer = ClientConfig::new()
        .set("bootstrap.servers", &bootstrap_servers)
        .set("message.timeout.ms", "5000")
        .create::<FutureProducer>()
        .expect("Failed to create Kafka FutureProducer");

    let consumer = ClientConfig::new()
        .set("group.id", "testcontainer-rs")
        .set("bootstrap.servers", &bootstrap_servers)
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "false")
        .set("auto.offset.reset", "earliest")
        .create::<StreamConsumer>()
        .expect("Failed to create Kafka StreamConsumer");

    let topic = "test-topic";

    let number_of_messages_to_produce = 5_usize;
    let expected: Vec<String> = (0..number_of_messages_to_produce)
        .map(|i| format!("Message {i}"))
        .collect();

    for (i, message) in expected.iter().enumerate() {
        producer
            .send(
                FutureRecord::to(topic)
                    .payload(message)
                    .key(&format!("Key {i}")),
                Duration::from_secs(0),
            )
            .await
            .unwrap();
    }

    consumer
        .subscribe(&[topic])
        .expect("Failed to subscribe to a topic");

    let mut message_stream = consumer.stream();
    for produced in expected {
        let borrowed_message =
            tokio::time::timeout(Duration::from_secs(10), message_stream.next())
                .await
                .unwrap()
                .unwrap();

        assert_eq!(
            produced,
            borrowed_message
                .unwrap()
                .payload_view::<str>()
                .unwrap()
                .unwrap()
        );
    }

This is the error that I am getting:

called `Result::unwrap()` on an `Err` value: (KafkaError (Message production error: MessageTimedOut (Local: Message timed out)), OwnedMessage { payload: Some([77, 101, 115, 115, 97, 103, 101, 32, 48]), key: Some([75, 101, 121, 32, 48]), topic: "test-topic", timestamp: NotAvailable, partition: 0, offset: -1001, headers: None })

and this is what I can see from the logs:

ERROR librdkafka > librdkafka: FAIL [thrd:127.0.0.1:55617/1]: 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT)
 ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT)
 ERROR librdkafka      > librdkafka: FAIL [thrd:127.0.0.1:55617/1]: 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
 ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)

rkhudov-tails avatar Jan 07 '25 19:01 rkhudov-tails

Did you customize DOCKER_HOST or anything else? Could you elaborate on your docker setup?

Also you always can use export TESTCONTAINERS_COMMAND=keep and double-check the container with docker inspect or check logs of the container after test-run

DDtKey avatar Jan 07 '25 19:01 DDtKey

I am using .devcontainer

rkhudov-tails avatar Jan 07 '25 19:01 rkhudov-tails

It basically means only one thing - DinD, but doesn't provide info about possible customization/configuration. I can only assume it's some reasonable defaults and docker.sock is mapped properly without any overridden behavior, because otherwise container wouldn't even start successfully.

Here is the fail happen on connection step. So it's most likely related to proper connection url (IP) 🤔

DDtKey avatar Jan 07 '25 20:01 DDtKey

Interesting thing that if I do command in container directly by using kafka_node.exec and changing bootstrap_servers to

let bootstrap_servers = format!(
        "{}:{}",
        "host.docker.internal",
        kafka_node.get_host_port_ipv4(KAFKA_PORT).await.unwrap()
    );

it works just fine.

rkhudov avatar Jan 07 '25 20:01 rkhudov

@DDtKey you said that doesn't provide info about possible customization/configuration. What do you mean?

rkhudov-tails avatar Jan 08 '25 10:01 rkhudov-tails

I meant that it's unclear which container runtime is used, how it's configured and if devcontainer has any customization over docker socker/host. But generally, since container is started and working - it's less important. Better to focus on network settings

DDtKey avatar Jan 08 '25 11:01 DDtKey

In .devcontainer I do have following:

"mounts": [
        "source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind"
    ]

I think the problem is definitely with network settings. Just need to figure out how to solve it...

rkhudov-tails avatar Jan 08 '25 12:01 rkhudov-tails

Generally, I guess the issue is that container is available on your host machine. 127.0.0.1 inside devcontainer refers to itself, but socket is used from host system.

Did you try to use host.docker.internal? Because in that case you will be accessing host machine

DDtKey avatar Jan 08 '25 12:01 DDtKey

Do you mean in env variables? or you mean you it as host for bootstrap_servers?

When I am using host.docker.internal as host for bootstrap_servers, I start to get following error:

ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): host.docker.internal:55809/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
 ERROR rdkafka::client > librdkafka: Global error: AllBrokersDown (Local: All broker connections are down): 1/1 brokers are down
 ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): host.docker.internal:55809/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
 ERROR librdkafka      > librdkafka: FAIL [thrd:host.docker.internal:55809/bootstrap]: host.docker.internal:55809/bootstrap: Connect to ipv4#192.168.65.254:55809 failed: Connection refused (after 6ms in state CONNECT)
 ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): host.docker.internal:55809/bootstrap: Connect to ipv4#192.168.65.254:55809 failed: Connection refused (after 6ms in state CONNECT)

I also tried different cases with kafka_node.get_host_port_ipv4(testcontainers::core::ContainerPort::Tcp(9092)).await.unwrap() and kafka_node.get_host_port_ipv4(testcontainers::core::ContainerPort::Tcp(9094)).await.unwrap()

rkhudov-tails avatar Jan 08 '25 13:01 rkhudov-tails

Just as an option, you can try to specify some bridge network for your devcontainer and then pass the same network to testcontainer. After which you can use get_bridge_ip_address 🤔

DDtKey avatar Jan 08 '25 13:01 DDtKey