go-queue
go-queue copied to clipboard
fix: producerNode.Delay func not wrap()
dq.NewProducerNode("localhost:11300", "tube")
producer through Delay()
send success.
but consumer consume unwrap()
error, because no wrap()
func
help to review it. thank!
@kevwan @kesonan
example:
package tasklogic
import (
"fmt"
"testing"
"time"
"github.com/zeromicro/go-queue/dq"
"github.com/zeromicro/go-zero/core/stores/redis"
)
func TestProducer(t *testing.T) {
producerNode := dq.NewProducerNode("localhost:11300", "tube")
_, err := producerNode.Delay([]byte("hello node"), time.Second*5)
if err != nil {
fmt.Println(err)
}
_, err = producerNode.At([]byte("at node"), time.Now().Add(time.Second*10))
if err != nil {
fmt.Println(err)
}
producerCluster := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
})
// 延迟 5s 后处理
_, err = producerCluster.Delay([]byte("hello cluster"), time.Second*5)
if err != nil {
fmt.Println(err)
}
// 在指定时间点处理
_, err = producerCluster.At([]byte("at cluster"), time.Now().Add(time.Second*10))
if err != nil {
fmt.Println(err)
}
}
func TestConsumer(t *testing.T) {
consumer := dq.NewConsumer(dq.DqConf{
Beanstalks: []dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
},
Redis: redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
},
})
consumer.Consume(func(body []byte) {
fmt.Println(string(body))
})
}
Console output
{"@timestamp":"2024-03-05T16:50:56.707+08:00","caller":"dq/consumer.go:50","content":"discarded: \"\"","level":"error"}
hello cluster
{"@timestamp":"2024-03-05T16:51:00.715+08:00","caller":"dq/consumer.go:50","content":"discarded: \"\"","level":"error"}
at cluster