nsq
nsq copied to clipboard
[问] 怎么发送有序消息?
我看go-nsq代码发现有 PublishOrdered方法, 但没找到例子。partitionKey 这个参数应该传什么值呢?
PublishOrdered(topic string, partitionKey []byte, body []byte)
partitionKey
传入用于分区的数据, 比如订单号, 或者用户id, 看你们想要以哪些数据为顺序, 一些客户端接口使用例子可以参考压测工具里面的代码 bench/multi_bench/multi_bench.go
是不是指定相同partitionKey的消息会存到同一个partition上,消费的时候就是顺序的了
是的, 然后使用顺序消费接收的消息就是按照partition内的顺序的了.
顺序消费需要特别调用什么方法吗?
消费者指定 config.EnableOrdered = true
, 生产方指定顺序生产的hash分区算法.
config.Hasher = murmur3.New32()
哦,了解 还有一个问题,如果要消费指定的数据怎么做的,我要记下每次消费的offset吗?
offset是服务端记录的, 客户端不用记录, 下次服务端会自动从上次的位置推送
@absolute8511 如果我新加入一个consumer, 想从某个消息开始消费该怎么写呢
另外有几个问题: https://github.com/youzan/nsq/blob/master/nsqdserver/context.go#L353 这个接口,如果chan里一直有消息推送进来, 是不是会一直append到pubInfoList
https://github.com/youzan/nsq/blob/master/consistence/nsqd_coordinator_cluster_write.go#L397 每条推送消息会同步到所有ISR,半数以上成功才算成功,会不会慢呢
会append到pubInfoList, 然后下次批量写入, chan取出来还是很快的, 一旦取完就会执行写入操作了, 所以不会一直进来. 后面防止一次批量太多, 可能会做最大批量限制. 这样批量提交就解决了每条推送消息会同步到所有ISR的性能问题.
consumer指定消费位置需要调用API: "http://127.0.0.1:4151/channel/setoffset", 参数参考 refer
有序topic其实是把同一个hashkey的消息,存在一个partition, 如果用比如roomID作为key , 是不是所有连接这个roomID的用户都连接到了同一个partition上了,如果roomID对应用户很多的情况会不会影响性能
@absolute8511 谢谢解答,学习了
是的 , 所以roomID要设计的分散一点, 顺序的情况下性能很大程度受限于同一个hashkey的消息数量
消息生产 (Golang, Java)
Golang示例
topics := ["test_topic"]
lookupAddress := "127.0.0.1:4161"
config := nsq.NewConfig()
// 启用顺序生产
config.EnableOrdered = true
// 默认pub返回的id, offset, rawSize数据无效, 当需要跟踪或者调试的时候, 可以开启
// trace, 这时id, offset, rawSize会返回服务端写入的队列位置便于跟踪. 注意不要一直开启
// 影响写入性能.
// config.EnableTrace = true
// 顺序生产的分区hash算法, 针对pub传入的sharding key做hash分区, 保证同样的订单id落到同一个分区保证顺序,
// 并且保证不同的订单id, 均匀的分散到多个分区保证不同订单id的并发能力
config.Hasher = murmur3.New32()
pubMgr, err := nsq.NewTopicProducerMgr(topics, config)
if err != nil {
log.Printf("init error : %v", err)
return
}
pubMgr.SetLogger(log.New(os.Stderr, "", log.LstdFlags), nsq.LogLevelInfo)
err = pubMgr.ConnectToNSQLookupd(lookupAddress)
if err != nil {
log.Printf("lookup connect error : %v", err)
return
}
var id nsq.NewMessageID
var offset uint64
var rawSize uint32
// 生产顺序消息, orderID就是传入需要保证顺序的订单id, 注意必须保证相同的orderID不会产生并发写入
id, offset, rawSize, err = pubMgr.PublishOrdered(topic, orderID, msg)
生产顺序消息, orderID就是传入需要保证顺序的订单id, 注意必须保证相同的orderID不会产生并发写入
如果并发写入会有什么影响呢? @absolute8511
并发写入, 写入的顺序就得不到保障了, 所以需要锁保护, 或者使用固定线程派发
明白了,多谢
@absolute8511 有序队列是不是同一个channel 同一个 partition 可以存在多个consumer ? 你们会出现这种情况? 因为消息处理有快慢之分,多个consumer存在的时候有序消息也会乱序了
可以有多个consumer, 普通消费可以提高并发度, 如果是顺序消费不会乱序, 服务端会强制控制消费并发为1
将kafka的设计嫁接在了nsq上啊
顺序消费,是不是 // 注册并发消费处理函数 consumer.AddConcurrentHandlers(dm, concurrency) , 这个不管有多少个concurrency,最终只会有一个进行处理,否则多个处理的话,也会是乱序的?
发现这个顺序消费,性能上比较差,有啥解决办法呀?
可以有多个consumer, 普通消费可以提高并发度, 如果是顺序消费不会乱序, 服务端会强制控制消费并发为1
是不是说顺序消费,nsqd会强制将并发设置为1,不管消费者设置了多少个并发?
@awinlei 是的, 可以通过增加分区数增加并发度, 顺序的分区内并发永远是1, 不同分区可以并发
@awinlei 是的, 可以通过增加分区数增加并发度, 顺序的分区内并发永远是1, 不同分区可以并发
你好,我看到“ 关于topic的分区限制
由于老版本没有完整的分区概念, 每台机子上面只能有一个同名的topic, 因此新版本为了兼容老版本的客户端, 对topic的partition数量做了限制, 每台机子每个topic只能有一个分区(含副本). 因此创建topic时的分区数*副本数应该不能大于集群的nsqd节点数. 对于顺序消费的topic无此限制, 因为老版本的客户端不支持顺序消费特性. ” 这说明顺序topic的分区数量是没有限制的,但是我在admin界面创建的时候,还有源码里看到最大的分区是254, http.go: MAX_PARTITION_NUM = 255
这也说明顺序消费的最大并发是255,对吗?
是的, 目前限制时255
可以有多个consumer, 普通消费可以提高并发度, 如果是顺序消费不会乱序, 服务端会强制控制消费并发为1
@absolute8511 没理解你这句话的意思,顺序时为什么一个分区可以有多个消费者来提高并发度,并且还在服务端设置了每次分发一条消息到消费者端?这样订阅了该分区的其他消费者不是空闲了吗,kafka处理顺序要么一个消费者一个分区,要么多个消费者订阅一个分区,但是内部有内存队列来控制
时为什么一个分区可以有多个消费者来提高并发度,并且还在服务端设置了每次分发一条消息到消费者端?这样订阅了该分区的其他消费
提高并发度说的是普通消费, 不是顺序消费. 顺序消费因为一次只有一个能收消息(会有空闲, 好处是单个消费挂掉不用做消费平衡)
谢谢!明白了。