nsq icon indicating copy to clipboard operation
nsq copied to clipboard

[问] 怎么发送有序消息?

Open weisd opened this issue 6 years ago • 29 comments

我看go-nsq代码发现有 PublishOrdered方法, 但没找到例子。partitionKey 这个参数应该传什么值呢?

PublishOrdered(topic string, partitionKey []byte, body []byte)

weisd avatar Oct 31 '17 10:10 weisd

partitionKey 传入用于分区的数据, 比如订单号, 或者用户id, 看你们想要以哪些数据为顺序, 一些客户端接口使用例子可以参考压测工具里面的代码 bench/multi_bench/multi_bench.go

absolute8511 avatar Oct 31 '17 17:10 absolute8511

是不是指定相同partitionKey的消息会存到同一个partition上,消费的时候就是顺序的了

weisd avatar Nov 01 '17 01:11 weisd

是的, 然后使用顺序消费接收的消息就是按照partition内的顺序的了.

absolute8511 avatar Nov 01 '17 05:11 absolute8511

顺序消费需要特别调用什么方法吗?

weisd avatar Nov 01 '17 14:11 weisd

消费者指定 config.EnableOrdered = true, 生产方指定顺序生产的hash分区算法.

config.Hasher = murmur3.New32()

absolute8511 avatar Nov 01 '17 15:11 absolute8511

哦,了解 还有一个问题,如果要消费指定的数据怎么做的,我要记下每次消费的offset吗?

weisd avatar Nov 01 '17 15:11 weisd

offset是服务端记录的, 客户端不用记录, 下次服务端会自动从上次的位置推送

absolute8511 avatar Nov 01 '17 15:11 absolute8511

@absolute8511 如果我新加入一个consumer, 想从某个消息开始消费该怎么写呢

weisd avatar Nov 21 '17 09:11 weisd

另外有几个问题: https://github.com/youzan/nsq/blob/master/nsqdserver/context.go#L353 这个接口,如果chan里一直有消息推送进来, 是不是会一直append到pubInfoList

weisd avatar Nov 21 '17 09:11 weisd

https://github.com/youzan/nsq/blob/master/consistence/nsqd_coordinator_cluster_write.go#L397 每条推送消息会同步到所有ISR,半数以上成功才算成功,会不会慢呢

weisd avatar Nov 21 '17 09:11 weisd

会append到pubInfoList, 然后下次批量写入, chan取出来还是很快的, 一旦取完就会执行写入操作了, 所以不会一直进来. 后面防止一次批量太多, 可能会做最大批量限制. 这样批量提交就解决了每条推送消息会同步到所有ISR的性能问题.

consumer指定消费位置需要调用API: "http://127.0.0.1:4151/channel/setoffset", 参数参考 refer

absolute8511 avatar Nov 21 '17 09:11 absolute8511

有序topic其实是把同一个hashkey的消息,存在一个partition, 如果用比如roomID作为key , 是不是所有连接这个roomID的用户都连接到了同一个partition上了,如果roomID对应用户很多的情况会不会影响性能

weisd avatar Nov 21 '17 09:11 weisd

@absolute8511 谢谢解答,学习了

weisd avatar Nov 21 '17 09:11 weisd

是的 , 所以roomID要设计的分散一点, 顺序的情况下性能很大程度受限于同一个hashkey的消息数量

absolute8511 avatar Nov 21 '17 09:11 absolute8511

消息生产 (Golang, Java)

Golang示例

    topics := ["test_topic"]
    lookupAddress := "127.0.0.1:4161"
    config := nsq.NewConfig()
    // 启用顺序生产
	config.EnableOrdered = true
	// 默认pub返回的id, offset, rawSize数据无效, 当需要跟踪或者调试的时候, 可以开启
	// trace, 这时id, offset, rawSize会返回服务端写入的队列位置便于跟踪. 注意不要一直开启
	// 影响写入性能.
	// config.EnableTrace = true

    // 顺序生产的分区hash算法, 针对pub传入的sharding key做hash分区, 保证同样的订单id落到同一个分区保证顺序,
    // 并且保证不同的订单id, 均匀的分散到多个分区保证不同订单id的并发能力
	config.Hasher = murmur3.New32()
	pubMgr, err := nsq.NewTopicProducerMgr(topics, config)
	if err != nil {
		log.Printf("init error : %v", err)
		return
	}
	pubMgr.SetLogger(log.New(os.Stderr, "", log.LstdFlags), nsq.LogLevelInfo)
	err = pubMgr.ConnectToNSQLookupd(lookupAddress)
	if err != nil {
		log.Printf("lookup connect error : %v", err)
		return
	}
	var id nsq.NewMessageID
	var offset uint64
	var rawSize uint32
    // 生产顺序消息, orderID就是传入需要保证顺序的订单id, 注意必须保证相同的orderID不会产生并发写入
	id, offset, rawSize, err = pubMgr.PublishOrdered(topic, orderID, msg)

生产顺序消息, orderID就是传入需要保证顺序的订单id, 注意必须保证相同的orderID不会产生并发写入

如果并发写入会有什么影响呢? @absolute8511

weisd avatar Dec 22 '17 03:12 weisd

并发写入, 写入的顺序就得不到保障了, 所以需要锁保护, 或者使用固定线程派发

absolute8511 avatar Dec 22 '17 05:12 absolute8511

明白了,多谢

weisd avatar Dec 22 '17 06:12 weisd

@absolute8511 有序队列是不是同一个channel 同一个 partition 可以存在多个consumer ? 你们会出现这种情况? 因为消息处理有快慢之分,多个consumer存在的时候有序消息也会乱序了

sukui avatar Dec 22 '17 07:12 sukui

可以有多个consumer, 普通消费可以提高并发度, 如果是顺序消费不会乱序, 服务端会强制控制消费并发为1

absolute8511 avatar Dec 22 '17 09:12 absolute8511

将kafka的设计嫁接在了nsq上啊

guyannanfei25 avatar Dec 25 '17 06:12 guyannanfei25

顺序消费,是不是 // 注册并发消费处理函数 consumer.AddConcurrentHandlers(dm, concurrency) , 这个不管有多少个concurrency,最终只会有一个进行处理,否则多个处理的话,也会是乱序的?

awinlei avatar Oct 23 '19 08:10 awinlei

发现这个顺序消费,性能上比较差,有啥解决办法呀?

awinlei avatar Oct 23 '19 08:10 awinlei

可以有多个consumer, 普通消费可以提高并发度, 如果是顺序消费不会乱序, 服务端会强制控制消费并发为1

是不是说顺序消费,nsqd会强制将并发设置为1,不管消费者设置了多少个并发?

awinlei avatar Oct 23 '19 08:10 awinlei

@awinlei 是的, 可以通过增加分区数增加并发度, 顺序的分区内并发永远是1, 不同分区可以并发

absolute8511 avatar Oct 23 '19 12:10 absolute8511

@awinlei 是的, 可以通过增加分区数增加并发度, 顺序的分区内并发永远是1, 不同分区可以并发

你好,我看到“ 关于topic的分区限制

由于老版本没有完整的分区概念, 每台机子上面只能有一个同名的topic, 因此新版本为了兼容老版本的客户端, 对topic的partition数量做了限制, 每台机子每个topic只能有一个分区(含副本). 因此创建topic时的分区数*副本数应该不能大于集群的nsqd节点数. 对于顺序消费的topic无此限制, 因为老版本的客户端不支持顺序消费特性. ” 这说明顺序topic的分区数量是没有限制的,但是我在admin界面创建的时候,还有源码里看到最大的分区是254, http.go: MAX_PARTITION_NUM = 255

这也说明顺序消费的最大并发是255,对吗?

awinlei avatar Oct 28 '19 04:10 awinlei

是的, 目前限制时255

absolute8511 avatar Oct 28 '19 05:10 absolute8511

可以有多个consumer, 普通消费可以提高并发度, 如果是顺序消费不会乱序, 服务端会强制控制消费并发为1

@absolute8511 没理解你这句话的意思,顺序时为什么一个分区可以有多个消费者来提高并发度,并且还在服务端设置了每次分发一条消息到消费者端?这样订阅了该分区的其他消费者不是空闲了吗,kafka处理顺序要么一个消费者一个分区,要么多个消费者订阅一个分区,但是内部有内存队列来控制

WeiFrank avatar Jun 20 '20 08:06 WeiFrank

时为什么一个分区可以有多个消费者来提高并发度,并且还在服务端设置了每次分发一条消息到消费者端?这样订阅了该分区的其他消费

提高并发度说的是普通消费, 不是顺序消费. 顺序消费因为一次只有一个能收消息(会有空闲, 好处是单个消费挂掉不用做消费平衡)

absolute8511 avatar Jun 20 '20 10:06 absolute8511

谢谢!明白了。

WeiFrank avatar Jun 21 '20 04:06 WeiFrank