pulsar-client-python icon indicating copy to clipboard operation
pulsar-client-python copied to clipboard

The producer performance of pulsar-client v3.5.0 is much lower than that of v2.10.2

Open Jayer23 opened this issue 1 year ago • 3 comments
trafficstars

When using pulsar-client v3.5.0 and pulsar-client v2.10.2 to send the same batch of data (about 100MB), 3.5.0 takes about 3.5 times longer than 2.10.2.

Core Code

async def consume_pulsar(q: asyncio.Queue, url: str, size: int):
    try:
        cli, pro = None, None
        for info in pulsar_infos:
            cli = pulsar.Client(
                io_threads=4,
                service_url=info.service_url)

            pro = cli.create_producer(
                info.topic,
                schema=pulsar.schema.StringSchema(),
                compression_type=pulsar.CompressionType.LZ4,
                batching_enabled=True,
                batching_max_allowed_size_in_bytes=1048576,
                max_pending_messages=10000,
                max_pending_messages_across_partitions=50000,
                batching_max_publish_delay_ms=10,
                block_if_queue_full=True
            )

        while True:
            lines = await q.get()
            if lines is None:
                break

            for line in lines:
                pro.send_async(line, callback=send_callback)

        pro.flush()
        cli.close()

    except Exception:
        raise

3.5.0

241

2.10.2

242 The above is the running time statistics of the line_profiler tool. The main time consumption is pro.send_async(line, callback=send_callback), which accounts for more than 97%. The pulsar-client v3.5.0 takes about 127s, and the pulsar-client v2.10.2 version takes about 35.6s.

Reproduce

Demo

import pulsar
import os

PULSAR_TOPIC="persistent://benchmark/test/test"
PULSAR_URL="pulsar://localhost:6650"


def send_callback(result, msg_id):
    return


def produce():
    try:
        cli = pulsar.Client(
            io_threads=4,
            service_url=PULSAR_URL)

        pro = cli.create_producer(
            PULSAR_TOPIC,
            schema=pulsar.schema.StringSchema(),
            compression_type=pulsar.CompressionType.LZ4,
            batching_enabled=True,
            batching_max_allowed_size_in_bytes=1048576,
            max_pending_messages=10000,
            max_pending_messages_across_partitions=50000,
            batching_max_publish_delay_ms=10,
            block_if_queue_full=True
        )
        for i in range(0, size):
            pro.send_async(msg, callback=send_callback)

        pro.flush()
        cli.close()

    except Exception:
        raise

if __name__ == "__main__":
    msg  = os.urandom(100).hex()
    size = 1000000
    produce()

Result

$ pip3 list | grep pulsar-client
pulsar-client       3.5.0
$ time python3 test_pulsar.py > 1

real	1m2.925s
user	1m1.735s
sys	1m1.803s
$ pip3 install pulsar-client==2.10.2
$ time python3 test_pulsar.py > 1
real	0m14.575s
user	0m16.140s
sys	0m7.578s

Jayer23 avatar Oct 18 '24 11:10 Jayer23