kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Transport doesn't use Resolver on initial connection

Open smw1218 opened this issue 2 years ago • 1 comments

Describe the bug

Transport.Resolver docs state:

// When set, the Dial function is not responsible for performing name
// resolution, and is always called with a pre-resolved address.
Resolver [BrokerResolver](https://pkg.go.dev/github.com/segmentio/kafka-go#BrokerResolver)

But the resolved address is not used in func (g *connGroup) grabConnOrConnect(ctx context.Context) (*conn, error) for the first connection: https://github.com/segmentio/kafka-go/blob/main/transport.go#L1013 which instead uses the passed in address ignoring the resolved one.

Kafka Version

main

To Reproduce

Add a custom Resolver to a Transport. Override Transport.Dial to log the passed in address. Note, the address passed in is not resolved to the IP provided by the resolver.

package main

import (
	"context"
	"log"
	"net"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	w := &kafka.Writer{
		Addr:                   kafka.TCP("localhost:9092"),
		AllowAutoTopicCreation: true,
		Transport:              getTransport(),
	}
	err := w.WriteMessages(context.Background(), kafka.Message{
		Topic: "topic",
		Value: []byte("message"),
	})
	if err != nil {
		log.Printf("failed write: %v", err)
	}
	log.Println("done")
}

func getTransport() kafka.RoundTripper {
	dw := &DialWrapper{
		Dial: (&net.Dialer{
			Timeout:   3 * time.Second,
			DualStack: true,
		}).DialContext,
	}
	transport := &kafka.Transport{
		Dial:     dw.LocalhostDialer,
		Resolver: NewLocalHostResolver(),
	}
	return transport
}

type DialWrapper struct {
	Dial func(context.Context, string, string) (net.Conn, error)
}

func (dw *DialWrapper) LocalhostDialer(ctx context.Context, network string, address string) (net.Conn, error) {
	log.Printf("address: %v", address)
	return dw.Dial(ctx, network, address)
}

func NewLocalHostResolver() alwaysResolver {
	return alwaysResolver(
		net.IPAddr{
			IP: net.ParseIP("127.0.0.1"),
		},
	)
}

type alwaysResolver net.IPAddr

func (ar alwaysResolver) LookupBrokerIPAddr(ctx context.Context, broker kafka.Broker) ([]net.IPAddr, error) {
	return []net.IPAddr{net.IPAddr(ar)}, nil
}

Expected Behavior

the log message should print resolved address: address: 127.0.0.1:9092

Observed Behavior

the log message prints unresolved address: address: localhost:9092

Additional Context

The workaround here is pretty easy, which is to do this override the address in a custom Transport.Dial. But I wasted a lot of time messing with the Resolver which doesn't seem to work as I would expect from the documentation.

My use case is using the bitnami/kafka docker container from the host network. Kafka returns internal names that only resolve in the docker network but are always equivalent to the host exposed 127.0.0.1:9092

smw1218 avatar Jul 02 '23 15:07 smw1218

+1, faced similar problem. In my app I use some initial "ping" mechanism and call dialer.DialLeader to make sure connection is ok. For some weird reason when I use non-localhost kafka instance (e.g. on some machine in my local network), the broker address falls back to localhost:9092. This was very hard to diagnose, can it be somehow fixed?

rovnyart avatar Aug 02 '23 18:08 rovnyart