pulsar-client-python
pulsar-client-python copied to clipboard
Message corruption with `deliver_after`
Describe the bug
Sending a message with compression, batching enabled and deliver_after results in a corrupted message. Disabling compression resolves the problem.
To Reproduce Steps to reproduce the behavior:
Create a Pulsar Consumer subscribed to some topic Start the consumer
Create a Pulsar Producer on the same topic with LZ4 compression enabled (see Additional context)
Call the producer.send_async method with deliver_after set to a valid timedelta (see Additional context)
The Consumer will now log something similar to:
ERROR [140072584582912] ConsumerImpl:534 | [persistent://public/default/test, test, 0] Failed to decompress message with 6 at 96981:3
ERROR [140072584582912] ConsumerImpl:546 | [persistent://public/default/test, test, 0] Discarding corrupted message at 96981:3
Expected behavior Instead of throwing an error, the message should reach the consumer uncorrupted.
Screenshots
Desktop (please complete the following information):
- OS: Ubuntu 20.04.2 LTS
- CPU: Intel i7 8750h
- RAM: 16 GB DDR4
- Disk: 1 TB NVMe SSD (with over 100 GB free on the partition)
- Nvidia dGPU (20th gen)
Additional context Using the Python Pulsar client version 2.7.0 (Python 3.8) and running Pulsar version 2.6.0 via Docker
Consumer options:
{
"consumer_type": pulsar.ConsumerType.Shared,
"initial_position": pulsar.InitialPosition.Earliest,
}
Producer options:
{
"compression_type": pulsar.CompressionType.LZ4,
"batching_enabled": True,
"block_if_queue_full": True,
"send_timeout_millis": 0
}
Producer call:
producer.send_async(
content=b'test',
callback=lambda res, msg_id: print(res),
partition_key=None,
deliver_after=datetime.timedelta(seconds=5.0)
)
@NSocec-Mindsmiths Does the compression issue only happens when sending the delayed messages? @BewareMyPower Do we have a test coverage for this case in the CPP client?
@codelipenghui No. The existing tests don't cover the compression case.
I cannot reproduce this issue in my local env. I tried both latest Pulsar and Pulsar 2.6.0 with latest Python client.
Could you try latest Python client (built from source) or 2.8.0?
Here is my test script:
import datetime
import faulthandler
import pulsar
faulthandler.enable()
if __name__ == '__main__':
client = pulsar.Client('pulsar://localhost:6650')
topic = 'my-topic-py'
consumer = client.subscribe(
topic=topic,
subscription_name='sub',
consumer_type=pulsar.ConsumerType.Shared,
initial_position=pulsar.InitialPosition.Earliest,
)
producer = client.create_producer(
topic=topic,
compression_type=pulsar.CompressionType.LZ4,
block_if_queue_full=True,
send_timeout_millis=0
)
producer.send_async(
content='test'.encode('utf-8'),
callback=lambda res, msg_id: print(res),
partition_key=None,
deliver_after=datetime.timedelta(seconds=5.0)
)
msg = consumer.receive()
print(msg.value())
Output:
2021-09-07 13:32:35.643 INFO [0x11aa64e00] Client:88 | Subscribing on Topic :my-topic-py
2021-09-07 13:32:35.644 INFO [0x11aa64e00] ClientConnection:181 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2021-09-07 13:32:35.644 INFO [0x11aa64e00] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2021-09-07 13:32:35.647 INFO [0x7000042a8000] ClientConnection:366 | [127.0.0.1:59114 -> 127.0.0.1:6650] Connected to broker
2021-09-07 13:32:35.653 INFO [0x7000042a8000] HandlerBase:64 | [persistent://public/default/my-topic-py, sub, 0] Getting connection from pool
2021-09-07 13:32:35.674 INFO [0x7000042a8000] ConsumerImpl:214 | [persistent://public/default/my-topic-py, sub, 0] Created consumer on broker [127.0.0.1:59114 -> 127.0.0.1:6650]
2021-09-07 13:32:35.677 INFO [0x7000042a8000] HandlerBase:64 | [persistent://public/default/my-topic-py, ] Getting connection from pool
2021-09-07 13:32:35.680 INFO [0x7000042a8000] ProducerImpl:188 | [persistent://public/default/my-topic-py, ] Created producer on broker [127.0.0.1:59114 -> 127.0.0.1:6650]
Ok
b'test'
2021-09-07 13:32:40.993 INFO [0x11aa64e00] ProducerImpl:590 | Producer - [persistent://public/default/my-topic-py, standalone-0-2] , [batching = off]
2021-09-07 13:32:40.993 WARN [0x11aa64e00] ProducerImpl:113 | [persistent://public/default/my-topic-py, standalone-0-2] Destroyed producer which was not properly closed
2021-09-07 13:32:40.995 ERROR [0x7000042a8000] ClientConnection:572 | [127.0.0.1:59114 -> 127.0.0.1:6650] Read failed: Operation canceled
2021-09-07 13:32:40.995 INFO [0x11aa64e00] ClientConnection:1495 | [127.0.0.1:59114 -> 127.0.0.1:6650] Connection closed
I cannot install Python client 2.7.0 in my local env so I didn't test it. The pip3 install stuck at:
Building wheels for collected packages: grpcio, ratelimit
Building wheel for grpcio (setup.py) ... |
@codelipenghui Yes, the issue only happens when using delays.
@BewareMyPower I ran your code and it worked properly on my setup too. However, your producer does not have batching_enabled set to True. When I set batching_enabled to True, the issue happens again.
I reproduced the error successfully with batching enabled:
producer = client.create_producer(
topic=topic,
batching_enabled=True,
compression_type=pulsar.CompressionType.LZ4,
block_if_queue_full=True,
send_timeout_millis=0
)
It's weird because the batching should be enabled by default. I cannot reproduce the bug with the underlying C++ client so I removed the component/c++ label first. It might be something wrong with the Python wrapper and I'll investigate this bug soon.
I'm sorry, that was a missclick. I didn't mean to close the issue.
The issue had no activity for 30 days, mark with Stale label.