goczmq icon indicating copy to clipboard operation
goczmq copied to clipboard

Sometimes get "frame error" on rx/tx, but retry method call can fix it

Open lynxzp opened this issue 2 years ago • 1 comments

Sometimes call RecvMessage() return recv frame error and SendFrame() return send frame error. But just retry call fix this problem.

Steps to reproduce: run this program (for loop is important part):

package main

import (
	"github.com/zeromq/goczmq"
	"log"
	"os"
)

func main() {

	rxUrl := "tcp://*:5555"
	txUrl := "tcp://127.0.0.1:5555"

	if len(os.Args) != 3 {
		log.Println("Usage: proxy rxUrl txUrl")
		log.Println("Using default values: proxy \"tcp://*:5555\" \"tcp://127.0.0.1:5555\"")
	} else {
		rxUrl = os.Args[1]
		txUrl = os.Args[2]
	}

	router, err := goczmq.NewRouter(rxUrl)
	if err != nil {
		log.Fatal(err)
	}
	defer router.Destroy()

	dealer, err := goczmq.NewDealer(txUrl)
	if err != nil {
		log.Fatal(err)
	}
	defer dealer.Destroy()

	err = dealer.SendFrame([]byte("123"), goczmq.FlagNone)
	if err != nil {
		log.Fatal(err)
	}

	seq := 0
	for {
		var data [][]byte
		seq++

		for {
			data, err = router.RecvMessage()
			if err != nil {
				log.Printf("rx error \"%v\" on iteration %v\n", err, seq)
			} else {
				break
			}
		}

		for {
			err = dealer.SendFrame(data[1], goczmq.FlagNone)
			if err != nil {
				log.Printf("tx error \"%v\" on iteration %v\n", err, seq)
			} else {
				break
			}

		}
	}
}

Sample output:

2022/06/02 16:17:18 Usage: proxy rxUrl txUrl
2022/06/02 16:17:18 Using default values: proxy "tcp://*:5555" "tcp://127.0.0.1:5555"
2022/06/02 16:17:18 rx error "recv frame error" on iteration 12506
2022/06/02 16:17:19 rx error "recv frame error" on iteration 32214
2022/06/02 16:17:19 rx error "recv frame error" on iteration 48934
2022/06/02 16:17:19 rx error "recv frame error" on iteration 71562
2022/06/02 16:17:20 rx error "recv frame error" on iteration 77118
2022/06/02 16:17:25 rx error "recv frame error" on iteration 342318
2022/06/02 16:17:25 rx error "recv frame error" on iteration 348342
2022/06/02 16:17:25 rx error "recv frame error" on iteration 351665
2022/06/02 16:17:26 tx error "send frame error" on iteration 365049
2022/06/02 16:17:26 rx error "recv frame error" on iteration 377613
2022/06/02 16:17:26 rx error "recv frame error" on iteration 393592
2022/06/02 16:17:27 rx error "recv frame error" on iteration 414721
2022/06/02 16:17:27 rx error "recv frame error" on iteration 424175
^C

lynxzp avatar Jun 02 '22 13:06 lynxzp

Seeing same behavior. You can reproduce it with this:

Server:

socket, err := goczmq.NewPub("tcp://*:5556,ipc://weather.ipc")
if err != nil {
	log.Fatal(err)
}
defer socket.Destroy()
rand.Seed(time.Now().UnixNano())
for {
	zipcode := 59937 //rand.Intn(100000)
	temperature := rand.Intn(215) - 80
	relhumidity := rand.Intn(50) + 10

	msg := fmt.Sprintf("%d %d %d", zipcode, temperature, relhumidity)
	fmt.Println("sending %s", msg)
	socket.SendFrame([]byte(msg), goczmq.FlagNone)

	time.Sleep(1000 * time.Millisecond)
}

Client:

socket, err := goczmq.NewSub("tcp://127.0.0.1:5556", "59937")
if err != nil {
	log.Fatal(err)
}

defer socket.Destroy()

var totalTemp int64
for i := 0; i < 101; i++ {
	fmt.Println("receiving message...")
	datapt, err := socket.RecvMessage()
	if err != nil {
		log.Fatalf("error: %w", err)
	}

	temps := strings.Split(string(datapt[0]), " ")
	temp, err := strconv.ParseInt(temps[1], 10, 64)
	if err == nil {
		fmt.Printf("received %s\n", datapt[0])
		totalTemp += temp
	}
}

fmt.Printf("average temp was %d", totalTemp/100)

This will fail randomly everytime. But if you use a poller, it never fails. Like this:

socket, err := goczmq.NewSub("tcp://127.0.0.1:5556", "59937")
if err != nil {
	log.Fatal(err)
}

poller, err := goczmq.NewPoller(socket)
if err != nil {
	log.Fatal(err)
}

defer socket.Destroy()

var totalTemp int64
for i := 0; i < 101; i++ {
	fmt.Println("receiving message...")
	s := poller.Wait(2000)
	if s == nil {
		continue
	}
	datapt, err := s.RecvMessage()
	if err != nil {
		log.Fatalf("error: %w", err)
	}

	temps := strings.Split(string(datapt[0]), " ")
	temp, err := strconv.ParseInt(temps[1], 10, 64)
	if err == nil {
		fmt.Printf("received %s\n", datapt[0])
		totalTemp += temp
	}
}

fmt.Printf("average temp was %d", totalTemp/100)

Interestingly enough, poller.Wait sometimes returns nil. I suspect it might be related?

nubunto avatar Jul 07 '22 16:07 nubunto