rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

Confluent Cloud and rdkafka with ssl-vendored, broker certificate could not be verified

Open blake-ec opened this issue 1 year ago • 0 comments

When using rdkafka feature ssl-vendored instead of ssl, kafka consumer fails to connect to a SASL auth-enabled Confluent Cloud cluster, and no messages are consumed

  • Minimal reproducible example below
  • Consumer works as expected on with feature = ssl but not with ssl-vendored
  • Consumer works on Ubuntu and ContainerOS Linux, but will not compile and pass tests on macOS with feature = ssl (I don't have logs showing the exact macOS build issue)
  • The original interest for ssl-vendored was to allow macOS developers to compile and test our kafka code -- but please mention if there is a suitable OS package for Mac that supports the dynamically linked ssl feature (with SASL support) instead of the statically linked ssl-vendored, possibly we just have not located the correct package or brew command.
  • This may be specific to Confluent Cloud, but that's the only SSL + SASL environment I have to test with

Error seen with ssl-vendored, which is not present with ssl:

[DEBUG] - librdkafka: FAIL [thrd:sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.clou]: sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.cloud:9092/bootstrap: SSL handshake failed: ssl/statem/statem_clnt.c:1919: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (install ca-certificates package) (after 3ms in state SSL_HANDSHAKE) (_SSL)

Example code:

Cargo.toml

[package]
name = "rust-kafka-sasl"
version = "0.1.0"
edition = "2021"

[dependencies]
futures = "0.3.16"
tokio = { version = "1.0", features = ["full"] }
env_logger = "0.9"
log = "0.4"

# ssl works
# rdkafka = { version = "0.32.2", features = ["ssl"] }

# ssl-vendored fails with SSL HANDSHAKE error
rdkafka = { version = "0.32.2", features = ["ssl-vendored"] }

main.rs

use rdkafka::config::ClientConfig;
use rdkafka::consumer::{base_consumer::BaseConsumer, Consumer};
use rdkafka::Message;
use env_logger::Builder;
use log::LevelFilter;
use std::io::Write;

fn main() {
    Builder::new()
        .format(|buf, record| writeln!(buf, "[{}] - {}", record.level(), record.args()))
        .filter(None, LevelFilter::Debug)
        .init();

    let consumer: BaseConsumer = ClientConfig::new()
        .set("bootstrap.servers", "hostname_redacted.us-central1.gcp.glb.confluent.cloud:9092")
        .set("security.protocol", "SASL_SSL")
        .set("sasl.mechanisms", "PLAIN")
        .set("sasl.username", "username_redacted")
        .set("sasl.password", "password_redacted")
        .set("group.id", "rust_confluent_consumer")
        .set("auto.offset.reset", "earliest")
        .set("debug", "all")
        .create()
        .expect("Consumer creation failed");

    println!("Created consumer.");

    consumer
        .subscribe(&["test-topic"])
        .expect("Can't subscribe to specified topic");

    println!("Starting to receive messages.");

    loop {
        match consumer.poll(None) {
            Some(Ok(m)) => {
                let key = match m.key_view::<str>() {
                    None => "".to_string(),
                    Some(Ok(s)) => s.to_string(),
                    Some(Err(e)) => {
                        println!("Error deserializing key: {:?}", e);
                        "".to_string()
                    }
                };

                let payload = match m.payload_view::<str>() {
                    None => "".to_string(),
                    Some(Ok(s)) => s.to_string(),
                    Some(Err(e)) => {
                        println!("Error deserializing payload: {:?}", e);
                        "".to_string()
                    }
                };

                println!("key: '{}', payload: '{}', topic: {}, partition: {}, offset: {}",
                    key, payload, m.topic(), m.partition(), m.offset());
            },
            Some(Err(e)) => {
                println!("Error receiving message: {:?}", e);
            },
            None => {}
        }
    }
}

Example Dockerfile (also built locally without Docker with same issue)

FROM rust:1.70 as builder
LABEL repository="rust-kafka-sasl"

WORKDIR /app

COPY . .

RUN cargo build --release

FROM debian:bullseye

RUN apt-get update \
    && apt-get install -y ca-certificates libssl-dev libsasl2-2 libsasl2-dev libsasl2-modules \
    && rm -rf /var/lib/apt/lists/*

COPY --from=builder /app/target/release/rust-kafka-sasl .

CMD ["./rust-kafka-sasl"]

Expected output:

[DEBUG] - librdkafka: SASL [thrd:app]: Selected provider PLAIN (builtin) for SASL mechanism PLAIN
[DEBUG] - librdkafka: OPENSSL [thrd:app]: Using OpenSSL version OpenSSL 1.1.1n  15 Mar 2022 (0x101010ef, librdkafka built with 0x101010ef)
...
[DEBUG] - librdkafka: CONNECT [thrd:main]: sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.cloud:9092/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
...
[DEBUG] - librdkafka: CONNECT [thrd:sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.clou]: sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.cloud:9092/bootstrap: Received CONNECT op
...
[DEBUG] - librdkafka: SUBSCRIBE [thrd:main]: Group "rust_confluent_consumer": subscribe to new subscription of 1 topics (join-state init)
...
Created consumer.
Starting to receive messages.
... (messages on the topic appear here)

Actual output with feature = ssl-vendored:

[DEBUG] - librdkafka: CONNECT [thrd:sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.clou]: sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.cloud:9092/bootstrap: Connecting to ipv4#10.1.16.110:9092 (sasl_ssl) with socket 9
[DEBUG] - librdkafka: CONNECT [thrd:sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.clou]: sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.cloud:9092/bootstrap: Connected to ipv4#10.1.16.110:9092
[DEBUG] - librdkafka: STATE [thrd:sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.clou]: sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.cloud:9092/bootstrap: Broker changed state CONNECT -> SSL_HANDSHAKE
[DEBUG] - librdkafka: BROADCAST [thrd:sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.clou]: Broadcasting state change
[DEBUG] - librdkafka: FAIL [thrd:sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.clou]: sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.cloud:9092/bootstrap: SSL handshake failed: ssl/statem/statem_clnt.c:1919: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (install ca-certificates package) (after 3ms in state SSL_HANDSHAKE) (_SSL): identical to last error: error log suppressed
[DEBUG] - librdkafka: STATE [thrd:sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.clou]: sasl_ssl://hostname_redacted.us-central1.gcp.glb.confluent.cloud:9092/bootstrap: Broker changed state SSL_HANDSHAKE -> DOWN

blake-ec avatar Jun 27 '23 15:06 blake-ec