kafka-go
kafka-go copied to clipboard
Transport doesn't use Resolver on initial connection
Describe the bug
Transport.Resolver docs state:
// When set, the Dial function is not responsible for performing name
// resolution, and is always called with a pre-resolved address.
Resolver [BrokerResolver](https://pkg.go.dev/github.com/segmentio/kafka-go#BrokerResolver)
But the resolved address is not used in func (g *connGroup) grabConnOrConnect(ctx context.Context) (*conn, error) for the first connection: https://github.com/segmentio/kafka-go/blob/main/transport.go#L1013 which instead uses the passed in address ignoring the resolved one.
Kafka Version
main
To Reproduce
Add a custom Resolver to a Transport. Override Transport.Dial to log the passed in address. Note, the address passed in is not resolved to the IP provided by the resolver.
package main
import (
"context"
"log"
"net"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
AllowAutoTopicCreation: true,
Transport: getTransport(),
}
err := w.WriteMessages(context.Background(), kafka.Message{
Topic: "topic",
Value: []byte("message"),
})
if err != nil {
log.Printf("failed write: %v", err)
}
log.Println("done")
}
func getTransport() kafka.RoundTripper {
dw := &DialWrapper{
Dial: (&net.Dialer{
Timeout: 3 * time.Second,
DualStack: true,
}).DialContext,
}
transport := &kafka.Transport{
Dial: dw.LocalhostDialer,
Resolver: NewLocalHostResolver(),
}
return transport
}
type DialWrapper struct {
Dial func(context.Context, string, string) (net.Conn, error)
}
func (dw *DialWrapper) LocalhostDialer(ctx context.Context, network string, address string) (net.Conn, error) {
log.Printf("address: %v", address)
return dw.Dial(ctx, network, address)
}
func NewLocalHostResolver() alwaysResolver {
return alwaysResolver(
net.IPAddr{
IP: net.ParseIP("127.0.0.1"),
},
)
}
type alwaysResolver net.IPAddr
func (ar alwaysResolver) LookupBrokerIPAddr(ctx context.Context, broker kafka.Broker) ([]net.IPAddr, error) {
return []net.IPAddr{net.IPAddr(ar)}, nil
}
Expected Behavior
the log message should print resolved address: address: 127.0.0.1:9092
Observed Behavior
the log message prints unresolved address: address: localhost:9092
Additional Context
The workaround here is pretty easy, which is to do this override the address in a custom Transport.Dial. But I wasted a lot of time messing with the Resolver which doesn't seem to work as I would expect from the documentation.
My use case is using the bitnami/kafka docker container from the host network. Kafka returns internal names that only resolve in the docker network but are always equivalent to the host exposed 127.0.0.1:9092
+1, faced similar problem. In my app I use some initial "ping" mechanism and call dialer.DialLeader to make sure connection is ok. For some weird reason when I use non-localhost kafka instance (e.g. on some machine in my local network), the broker address falls back to localhost:9092. This was very hard to diagnose, can it be somehow fixed?