goczmq
goczmq copied to clipboard
Sometimes get "frame error" on rx/tx, but retry method call can fix it
Sometimes call RecvMessage()
return recv frame error
and SendFrame()
return send frame error
. But just retry call fix this problem.
Steps to reproduce: run this program (for
loop is important part):
package main
import (
"github.com/zeromq/goczmq"
"log"
"os"
)
func main() {
rxUrl := "tcp://*:5555"
txUrl := "tcp://127.0.0.1:5555"
if len(os.Args) != 3 {
log.Println("Usage: proxy rxUrl txUrl")
log.Println("Using default values: proxy \"tcp://*:5555\" \"tcp://127.0.0.1:5555\"")
} else {
rxUrl = os.Args[1]
txUrl = os.Args[2]
}
router, err := goczmq.NewRouter(rxUrl)
if err != nil {
log.Fatal(err)
}
defer router.Destroy()
dealer, err := goczmq.NewDealer(txUrl)
if err != nil {
log.Fatal(err)
}
defer dealer.Destroy()
err = dealer.SendFrame([]byte("123"), goczmq.FlagNone)
if err != nil {
log.Fatal(err)
}
seq := 0
for {
var data [][]byte
seq++
for {
data, err = router.RecvMessage()
if err != nil {
log.Printf("rx error \"%v\" on iteration %v\n", err, seq)
} else {
break
}
}
for {
err = dealer.SendFrame(data[1], goczmq.FlagNone)
if err != nil {
log.Printf("tx error \"%v\" on iteration %v\n", err, seq)
} else {
break
}
}
}
}
Sample output:
2022/06/02 16:17:18 Usage: proxy rxUrl txUrl
2022/06/02 16:17:18 Using default values: proxy "tcp://*:5555" "tcp://127.0.0.1:5555"
2022/06/02 16:17:18 rx error "recv frame error" on iteration 12506
2022/06/02 16:17:19 rx error "recv frame error" on iteration 32214
2022/06/02 16:17:19 rx error "recv frame error" on iteration 48934
2022/06/02 16:17:19 rx error "recv frame error" on iteration 71562
2022/06/02 16:17:20 rx error "recv frame error" on iteration 77118
2022/06/02 16:17:25 rx error "recv frame error" on iteration 342318
2022/06/02 16:17:25 rx error "recv frame error" on iteration 348342
2022/06/02 16:17:25 rx error "recv frame error" on iteration 351665
2022/06/02 16:17:26 tx error "send frame error" on iteration 365049
2022/06/02 16:17:26 rx error "recv frame error" on iteration 377613
2022/06/02 16:17:26 rx error "recv frame error" on iteration 393592
2022/06/02 16:17:27 rx error "recv frame error" on iteration 414721
2022/06/02 16:17:27 rx error "recv frame error" on iteration 424175
^C
Seeing same behavior. You can reproduce it with this:
Server:
socket, err := goczmq.NewPub("tcp://*:5556,ipc://weather.ipc")
if err != nil {
log.Fatal(err)
}
defer socket.Destroy()
rand.Seed(time.Now().UnixNano())
for {
zipcode := 59937 //rand.Intn(100000)
temperature := rand.Intn(215) - 80
relhumidity := rand.Intn(50) + 10
msg := fmt.Sprintf("%d %d %d", zipcode, temperature, relhumidity)
fmt.Println("sending %s", msg)
socket.SendFrame([]byte(msg), goczmq.FlagNone)
time.Sleep(1000 * time.Millisecond)
}
Client:
socket, err := goczmq.NewSub("tcp://127.0.0.1:5556", "59937")
if err != nil {
log.Fatal(err)
}
defer socket.Destroy()
var totalTemp int64
for i := 0; i < 101; i++ {
fmt.Println("receiving message...")
datapt, err := socket.RecvMessage()
if err != nil {
log.Fatalf("error: %w", err)
}
temps := strings.Split(string(datapt[0]), " ")
temp, err := strconv.ParseInt(temps[1], 10, 64)
if err == nil {
fmt.Printf("received %s\n", datapt[0])
totalTemp += temp
}
}
fmt.Printf("average temp was %d", totalTemp/100)
This will fail randomly everytime. But if you use a poller, it never fails. Like this:
socket, err := goczmq.NewSub("tcp://127.0.0.1:5556", "59937")
if err != nil {
log.Fatal(err)
}
poller, err := goczmq.NewPoller(socket)
if err != nil {
log.Fatal(err)
}
defer socket.Destroy()
var totalTemp int64
for i := 0; i < 101; i++ {
fmt.Println("receiving message...")
s := poller.Wait(2000)
if s == nil {
continue
}
datapt, err := s.RecvMessage()
if err != nil {
log.Fatalf("error: %w", err)
}
temps := strings.Split(string(datapt[0]), " ")
temp, err := strconv.ParseInt(temps[1], 10, 64)
if err == nil {
fmt.Printf("received %s\n", datapt[0])
totalTemp += temp
}
}
fmt.Printf("average temp was %d", totalTemp/100)
Interestingly enough, poller.Wait
sometimes returns nil
. I suspect it might be related?