pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

consume stuck after seekByTiime

Open xige-16 opened this issue 3 years ago • 4 comments

Expected behavior

consume success after seekByTime

Actual behavior

After running the for loop a few times, step msg := <-consumer.Chan() will be blocked.

Steps to reproduce

func TestPulsarChannel2(t *testing.T) {
	Params.Init()
	c, err := pulsar.NewClient(pulsar.ClientOptions{URL: Params.PulsarAddress})
	assert.Nil(t, err)
	for i := 0; i < 100; i++ {
		channel := strconv.Itoa(rand.Int())

		p, err := c.CreateProducer(pulsar.ProducerOptions{Topic: channel})
		assert.Nil(t, err)
		ppm := &pulsar.ProducerMessage{Payload: []byte{1}, Properties: map[string]string{}}
		_, err = p.Send(context.TODO(), ppm)
		assert.Nil(t, err)

		p.Close()

		consumer, err := c.Subscribe(pulsar.ConsumerOptions{
			Topic:                       channel,
			SubscriptionName:            "test",
			Type:                        pulsar.KeyShared,
			SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
		})
		assert.Nil(t, err)
		consumer.SeekByTime(time.Unix(0, 0))
		msg := <-consumer.Chan()
		assert.EqualValues(t, []byte{1}, msg.Payload)

		consumer.Close()
	}
}

System configuration

Pulsar version: x.y

xige-16 avatar Aug 16 '21 07:08 xige-16

Which client version are you using?

cckellogg avatar Aug 17 '21 00:08 cckellogg

Which client version are you using?

v0.5.0

xige-16 avatar Aug 17 '21 01:08 xige-16

Do you see the same issue with v0.6.0?

cckellogg avatar Aug 24 '21 19:08 cckellogg

v 0.6.0 is ok

xige-16 avatar Jan 05 '22 03:01 xige-16