phpkafka
phpkafka copied to clipboard
配置新起的消费者组配置从头消费还是从最新的消息开始消费
- 你遇到了什么问题?
在新起的消费者组如果在broker端没有对应的消费位移的话,默认从offset为0开始消费,但分区前面的数据可能过期被删除了 对应的offset可能不是从0开始的 fetchMessage时候会报out_of_range,这时候看代码里获取了一次分区提交的最新偏移量从最新的消息开始消费。这样不能自己配置在新起消费者后是从头消费还是从最新的消息开始消费。
后期是否可以加一个功能,java版的是auto.offset.reset 对应的配置是earliest/latest,如果在broker里找不到对应的消费位移的话 就会根据这个配置 去取分区最早的偏移量或者最新的偏移量,这样可以在新起了消费者组后配置是从头开始消费还是从最新的消息开始消费。
- Kafka 环境是自建还是云服务?
自建
- 请执行下面的命令获取环境信息。
php -v & php --ri swoole & composer info | grep longlang/phpkafka
# 粘贴到这里
PHP 7.4.19 (cli) (built: May 31 2021 04:55:16) ( NTS )
Copyright (c) The PHP Group
Zend Engine v3.4.0, Copyright (c) Zend Technologies
with Yasd v0.3.8, Our Copyright, by codinghuang
swoole
Swoole => enabled
Author => Swoole Team <[email protected]>
Version => 4.4.15
Built => May 31 2021 05:59:33
coroutine => enabled
epoll => enabled
eventfd => enabled
signalfd => enabled
cpu_affinity => enabled
spinlock => enabled
rwlock => enabled
sockets => enabled
openssl => OpenSSL 1.0.2k-fips 26 Jan 2017
http2 => enabled
pcre => enabled
zlib => 1.2.7
mutex_timedlock => enabled
pthread_barrier => enabled
futex => enabled
async_redis => enabled
Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => On => On
swoole.unixsock_buffer_size => 8388608 => 8388608
Do not run Composer as root/super user! See https://getcomposer.org/root for details
Continue as root/super user [yes]? yes
longlang/phpkafka v1.1.4 A kafka client. Support php-fpm and Swoole.
- 提供最小可复现代码:
// 你的代码