rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

how to authenticate using ca certificate, access certificate, access key

Open vizvasrj opened this issue 5 months ago • 1 comments

please i need rust equivalent for this here is go equivalent for producer

package main

import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io/ioutil"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

func main() {
    TOPIC_NAME := "default_topic"

    keypair, err := tls.LoadX509KeyPair("service.cert", "service.key")
    if err != nil {
        log.Fatalf("Failed to load access key and/or access certificate: %s", err)
    }

    caCert, err := ioutil.ReadFile("ca.pem")
    if err != nil {
        log.Fatalf("Failed to read CA certificate file: %s", err)
    }

    caCertPool := x509.NewCertPool()
    ok := caCertPool.AppendCertsFromPEM(caCert)
    if !ok {
        log.Fatalf("Failed to parse CA certificate file: %s", err)
    }

    dialer := &kafka.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
        TLS: &tls.Config{
            Certificates: []tls.Certificate{keypair},
            RootCAs:      caCertPool,
        },
    }

    // init producer
    producer := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{"kafka-257bfc54-lakjos-f2b6.a.aivencloud.com:19190"},
        Topic:   TOPIC_NAME,
        Dialer:  dialer,
    })

    // produce 100 messages
    for i := 0; i < 100; i++ {
        message := fmt.Sprint("Hello from Go using SSL ", i+1, "!")
        producer.WriteMessages(context.Background(), kafka.Message{Value: []byte(message)})
        log.Printf("Message sent: " + message)
        time.Sleep(time.Second)
    }

    producer.Close()
}

and consumer

package main

import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "io/ioutil"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

func main() {
    TOPIC_NAME := "default_topic"

    keypair, err := tls.LoadX509KeyPair("service.cert", "service.key")
    if err != nil {
        log.Fatalf("Failed to load access key and/or access certificate: %s", err)
    }

    caCert, err := ioutil.ReadFile("ca.pem")
    if err != nil {
        log.Fatalf("Failed to read CA certificate file: %s", err)
    }

    caCertPool := x509.NewCertPool()
    ok := caCertPool.AppendCertsFromPEM(caCert)
    if !ok {
        log.Fatalf("Failed to parse CA certificate file: %s", err)
    }

    dialer := &kafka.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
        TLS: &tls.Config{
            Certificates: []tls.Certificate{keypair},
            RootCAs:      caCertPool,
        },
    }

    // init consumer
    consumer := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"kafka-257bfc54-lakjos-f2b6.a.aivencloud.com:19190"},
        Topic:   TOPIC_NAME,
        Dialer:  dialer,
    })

    for {
        message, err := consumer.ReadMessage(context.Background())

        if err != nil {
            log.Printf("Could not read message: %s", err)
        } else {
            log.Printf("Got message using SSL: %s", message.Value)
        }
    }
}

vizvasrj avatar Feb 05 '24 11:02 vizvasrj

did you find any way to do this?

conradogarciaberrotaran avatar Mar 14 '24 15:03 conradogarciaberrotaran