confluent-kafka-go
confluent-kafka-go copied to clipboard
Work with go gin error
Description
I use confluent-kafka-go with github.com/gin-gonic/gin web server
all code logic is : get message info from http get convert it to json string write the json string to kafka using confluent-kafka-go asyncProducer
code like this
func AsyncProducer(p *kafka.Producer, topic string, msg string) {
//-- -------------------------------
//--> @Description
//-->
// https://docs.confluent.io/kafka-clients/go/current/overview.html#asynchronous-writes
deliveryChan := make(chan kafka.Event, 1000)
err := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(msg),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, deliveryChan)
if err != nil {
fmt.Println("Failed to produce message")
fmt.Printf("some error: \n%v\n", err)
panic(err)
}
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("=Failed to deliver message: %v\n%s\n=", ev.TopicPartition, ev.TopicPartition.Error)
} else {
fmt.Printf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
}
case kafka.Error:
// Generic client instance-level errors, such as
// broker connection failures, authentication issues, etc.
//
// These errors should generally be considered informational
// as the underlying client will automatically try to
// recover from any errors encountered, the application
// does not need to take action on them.
fmt.Printf("kafka Error: %v\n", ev)
default:
fmt.Printf("Ignored kafka event: %s\n", ev)
}
}
}()
}
when I access the web server , it retuen http 500 and return error
Local: Invalid argument or configuration
if I add os.Exit(0) before run go-gin, no message was writen to kafka if I add os.Exit(0) before run go-gin and sleep for a while , about 10 second, then the messages used to test can write to kafka correctly
if I remove the os.Exit(0) , and the gin web server start ok , but when I access the server with http get method, it returns http 500
So I don't konw why, and how to fix this
How to reproduce
Checklist
Please provide the following information:
- [x] confluent-kafka-go and librdkafka version (
LibraryVersion()
):
v2.3.0 - [x] Apache Kafka broker version:
3.6.1 - [x] Client configuration:
ConfigMap{...}
&map[acks:1 bootstrap.servers:192.168.11.60:9092,192.168.11.61:9092,192.168.11.62:9092 debug:msg delivery.report.only.error:false linger.ms:500 message.max.bytes:1000000 retries:2147482647 retry.backoff.ms:1000 security.protocol :plaintext sticky.partitioning.linger.ms:1000] - [x] Operating system:
WSL: kali 2023.4 - [x] Provide client logs (with
"debug": ".."
as necessary)
Failed to produce message
some error:
Local: Invalid argument or configuration
Local: Invalid argument or configuration
/mnt/e/go/go_gin_kafka/utils/bigdata/gokafka.go:154 (0x59a4c4)
AsyncProducer: panic(err)
/mnt/e/go/go_gin_kafka/webserver.go:78 (0x86718b)
WriteKafka: goKafka.AsyncProducer(p, ConfMap["topic"], message)
/home/darkz/go/pkg/mod/github.com/gin-gonic/[email protected]/context.go:174 (0x86765b)
(*Context).Next: c.handlers[c.index](c)
/mnt/e/go/go_gin_kafka/webserver.go:141 (0x867649)
DataCheck: c.Next()
/home/darkz/go/pkg/mod/github.com/gin-gonic/[email protected]/context.go:174 (0x86749e)
(*Context).Next: c.handlers[c.index](c)
/mnt/e/go/go_gin_kafka/webserver.go:123 (0x86748c)
Url2Json: c.Next() // 作用类似于goto.. 直接跳到下一个中间件执行完再返回
/home/darkz/go/pkg/mod/github.com/gin-gonic/[email protected]/context.go:174 (0x83b599)
(*Context).Next: c.handlers[c.index](c)
/home/darkz/go/pkg/mod/github.com/gin-gonic/[email protected]/recovery.go:102 (0x83b587)
CustomRecoveryWithWriter.func1: c.Next()
/home/darkz/go/pkg/mod/github.com/gin-gonic/[email protected]/context.go:174 (0x83a6dc)
(*Context).Next: c.handlers[c.index](c)
/home/darkz/go/pkg/mod/github.com/gin-gonic/[email protected]/logger.go:240 (0x83a6c3)
LoggerWithConfig.func1: c.Next()
/home/darkz/go/pkg/mod/github.com/gin-gonic/[email protected]/context.go:174 (0x839caa)
(*Context).Next: c.handlers[c.index](c)
/home/darkz/go/pkg/mod/github.com/gin-gonic/[email protected]/gin.go:656 (0x839c97)
serveError: c.Next()
/home/darkz/go/pkg/mod/github.com/gin-gonic/[email protected]/gin.go:649 (0x839a04)
(*Engine).handleHTTPRequest: serveError(c, http.StatusNotFound, default404Body)
/home/darkz/go/pkg/mod/github.com/gin-gonic/[email protected]/gin.go:576 (0x8394f1)
(*Engine).ServeHTTP: engine.handleHTTPRequest(c)
/home/darkz/go/pkg/mod/golang.org/[email protected]/src/net/http/server.go:3137 (0x6e0b6d)
serverHandler.ServeHTTP: handler.ServeHTTP(rw, req)
/home/darkz/go/pkg/mod/golang.org/[email protected]/src/net/http/server.go:2039 (0x6dc6e7)
(*conn).serve: serverHandler{c.server}.ServeHTTP(w, w.req)
/home/darkz/go/pkg/mod/golang.org/[email protected]/src/runtime/asm_amd64.s:1695 (0x47a420)
goexit: BYTE $0x90 // NOP
- [x] Provide broker log excerpts
None....
- [x] Critical issue
Hi @darrkz , can you log before producing the topic name? it might be because topic string is empty
@milindl that's OK, I found I miss a var setting outer function main, and use := redefined it in main, so it could not user in other where. And I think the log not show this err clearly, cost so much time....
Alright @darrkz thanks for the update . Would be be interested in making a PR to improve the messaging around the error? I'm not certain how the error was solved so it might be useful to others in the future.
@milindl Just my mistake, and not something wrong about confluent-kafka-go...
The pr I will make it later...
@milindl pr is here https://github.com/confluentinc/confluent-kafka-go/pull/1162
Please review for it