go-queue icon indicating copy to clipboard operation
go-queue copied to clipboard

fix: producerNode.Delay func not wrap()

Open ch3nnn opened this issue 11 months ago • 1 comments

dq.NewProducerNode("localhost:11300", "tube") producer through Delay() send success. but consumer consume unwrap() error, because no wrap() func

help to review it. thank!

@kevwan @kesonan

ch3nnn avatar Mar 05 '24 08:03 ch3nnn

example:

package tasklogic

import (
	"fmt"
	"testing"
	"time"

	"github.com/zeromicro/go-queue/dq"
	"github.com/zeromicro/go-zero/core/stores/redis"
)

func TestProducer(t *testing.T) {
	producerNode := dq.NewProducerNode("localhost:11300", "tube")
	_, err := producerNode.Delay([]byte("hello node"), time.Second*5)
	if err != nil {
		fmt.Println(err)
	}
	_, err = producerNode.At([]byte("at node"), time.Now().Add(time.Second*10))
	if err != nil {
		fmt.Println(err)
	}

	producerCluster := dq.NewProducer([]dq.Beanstalk{
		{
			Endpoint: "localhost:11300",
			Tube:     "tube",
		},
		{
			Endpoint: "localhost:11301",
			Tube:     "tube",
		},
	})

	// 延迟 5s 后处理
	_, err = producerCluster.Delay([]byte("hello cluster"), time.Second*5)
	if err != nil {
		fmt.Println(err)
	}

	// 在指定时间点处理
	_, err = producerCluster.At([]byte("at cluster"), time.Now().Add(time.Second*10))
	if err != nil {
		fmt.Println(err)
	}
}

func TestConsumer(t *testing.T) {
	consumer := dq.NewConsumer(dq.DqConf{
		Beanstalks: []dq.Beanstalk{
			{
				Endpoint: "localhost:11300",
				Tube:     "tube",
			},
		},
		Redis: redis.RedisConf{
			Host: "localhost:6379",
			Type: redis.NodeType,
		},
	})
	consumer.Consume(func(body []byte) {
		fmt.Println(string(body))
	})
}

Console output


{"@timestamp":"2024-03-05T16:50:56.707+08:00","caller":"dq/consumer.go:50","content":"discarded: \"\"","level":"error"}
hello cluster
{"@timestamp":"2024-03-05T16:51:00.715+08:00","caller":"dq/consumer.go:50","content":"discarded: \"\"","level":"error"}
at cluster

ch3nnn avatar Mar 05 '24 08:03 ch3nnn