pulsar-client-python icon indicating copy to clipboard operation
pulsar-client-python copied to clipboard

Message corruption with `deliver_after`

Open NSocec-Mindsmiths opened this issue 4 years ago • 9 comments
trafficstars

Describe the bug Sending a message with compression, batching enabled and deliver_after results in a corrupted message. Disabling compression resolves the problem.

To Reproduce Steps to reproduce the behavior:

Create a Pulsar Consumer subscribed to some topic Start the consumer

Create a Pulsar Producer on the same topic with LZ4 compression enabled (see Additional context) Call the producer.send_async method with deliver_after set to a valid timedelta (see Additional context)

The Consumer will now log something similar to: ERROR [140072584582912] ConsumerImpl:534 | [persistent://public/default/test, test, 0] Failed to decompress message with 6 at 96981:3 ERROR [140072584582912] ConsumerImpl:546 | [persistent://public/default/test, test, 0] Discarding corrupted message at 96981:3

Expected behavior Instead of throwing an error, the message should reach the consumer uncorrupted.

Screenshots

Desktop (please complete the following information):

  • OS: Ubuntu 20.04.2 LTS
  • CPU: Intel i7 8750h
  • RAM: 16 GB DDR4
  • Disk: 1 TB NVMe SSD (with over 100 GB free on the partition)
  • Nvidia dGPU (20th gen)

Additional context Using the Python Pulsar client version 2.7.0 (Python 3.8) and running Pulsar version 2.6.0 via Docker

Consumer options:

{
    "consumer_type": pulsar.ConsumerType.Shared,
    "initial_position": pulsar.InitialPosition.Earliest,
}

Producer options:

{
    "compression_type": pulsar.CompressionType.LZ4,
    "batching_enabled": True,
    "block_if_queue_full": True,
    "send_timeout_millis": 0
}

Producer call:

producer.send_async(
    content=b'test',
    callback=lambda res, msg_id: print(res),
    partition_key=None,
    deliver_after=datetime.timedelta(seconds=5.0)
)

NSocec-Mindsmiths avatar Sep 03 '21 15:09 NSocec-Mindsmiths

@NSocec-Mindsmiths Does the compression issue only happens when sending the delayed messages? @BewareMyPower Do we have a test coverage for this case in the CPP client?

codelipenghui avatar Sep 06 '21 14:09 codelipenghui

@codelipenghui No. The existing tests don't cover the compression case.

BewareMyPower avatar Sep 07 '21 01:09 BewareMyPower

I cannot reproduce this issue in my local env. I tried both latest Pulsar and Pulsar 2.6.0 with latest Python client.

Could you try latest Python client (built from source) or 2.8.0?

BewareMyPower avatar Sep 07 '21 05:09 BewareMyPower

Here is my test script:

import datetime
import faulthandler
import pulsar

faulthandler.enable()

if __name__ == '__main__':
    client = pulsar.Client('pulsar://localhost:6650')
    topic = 'my-topic-py'
    consumer = client.subscribe(
            topic=topic,
            subscription_name='sub',
            consumer_type=pulsar.ConsumerType.Shared,
            initial_position=pulsar.InitialPosition.Earliest,
    )
    producer = client.create_producer(
            topic=topic,
            compression_type=pulsar.CompressionType.LZ4,
            block_if_queue_full=True,
            send_timeout_millis=0
    )
    producer.send_async(
            content='test'.encode('utf-8'),
            callback=lambda res, msg_id: print(res),
            partition_key=None,
            deliver_after=datetime.timedelta(seconds=5.0)
    )
    msg = consumer.receive()
    print(msg.value())

Output:

2021-09-07 13:32:35.643 INFO  [0x11aa64e00] Client:88 | Subscribing on Topic :my-topic-py
2021-09-07 13:32:35.644 INFO  [0x11aa64e00] ClientConnection:181 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2021-09-07 13:32:35.644 INFO  [0x11aa64e00] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2021-09-07 13:32:35.647 INFO  [0x7000042a8000] ClientConnection:366 | [127.0.0.1:59114 -> 127.0.0.1:6650] Connected to broker
2021-09-07 13:32:35.653 INFO  [0x7000042a8000] HandlerBase:64 | [persistent://public/default/my-topic-py, sub, 0] Getting connection from pool
2021-09-07 13:32:35.674 INFO  [0x7000042a8000] ConsumerImpl:214 | [persistent://public/default/my-topic-py, sub, 0] Created consumer on broker [127.0.0.1:59114 -> 127.0.0.1:6650] 
2021-09-07 13:32:35.677 INFO  [0x7000042a8000] HandlerBase:64 | [persistent://public/default/my-topic-py, ] Getting connection from pool
2021-09-07 13:32:35.680 INFO  [0x7000042a8000] ProducerImpl:188 | [persistent://public/default/my-topic-py, ] Created producer on broker [127.0.0.1:59114 -> 127.0.0.1:6650] 
Ok
b'test'
2021-09-07 13:32:40.993 INFO  [0x11aa64e00] ProducerImpl:590 | Producer - [persistent://public/default/my-topic-py, standalone-0-2] , [batching  = off]
2021-09-07 13:32:40.993 WARN  [0x11aa64e00] ProducerImpl:113 | [persistent://public/default/my-topic-py, standalone-0-2] Destroyed producer which was not properly closed
2021-09-07 13:32:40.995 ERROR [0x7000042a8000] ClientConnection:572 | [127.0.0.1:59114 -> 127.0.0.1:6650] Read failed: Operation canceled
2021-09-07 13:32:40.995 INFO  [0x11aa64e00] ClientConnection:1495 | [127.0.0.1:59114 -> 127.0.0.1:6650] Connection closed

I cannot install Python client 2.7.0 in my local env so I didn't test it. The pip3 install stuck at:

Building wheels for collected packages: grpcio, ratelimit
  Building wheel for grpcio (setup.py) ... |

BewareMyPower avatar Sep 07 '21 05:09 BewareMyPower

@codelipenghui Yes, the issue only happens when using delays.

NSocec-Mindsmiths avatar Sep 07 '21 07:09 NSocec-Mindsmiths

@BewareMyPower I ran your code and it worked properly on my setup too. However, your producer does not have batching_enabled set to True. When I set batching_enabled to True, the issue happens again.

NSocec-Mindsmiths avatar Sep 07 '21 07:09 NSocec-Mindsmiths

I reproduced the error successfully with batching enabled:

    producer = client.create_producer(
            topic=topic,
            batching_enabled=True,
            compression_type=pulsar.CompressionType.LZ4,
            block_if_queue_full=True,
            send_timeout_millis=0
    )

It's weird because the batching should be enabled by default. I cannot reproduce the bug with the underlying C++ client so I removed the component/c++ label first. It might be something wrong with the Python wrapper and I'll investigate this bug soon.

BewareMyPower avatar Sep 07 '21 07:09 BewareMyPower

I'm sorry, that was a missclick. I didn't mean to close the issue.

NSocec-Mindsmiths avatar Sep 07 '21 07:09 NSocec-Mindsmiths

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Mar 04 '22 02:03 github-actions[bot]