pulsar-client-python
pulsar-client-python copied to clipboard
[BUG] use celery task send data to pulsar, init pulsar client error
python: 3.6 pulsar-client: pulsar-client[avro]==2.10.2 celery: 5.1.2
code example:
#!/usr/bin/env python
import time
import random
import string
from pulsar import Client, CompressionType
from pulsar.schema import AvroSchema, Record, String, Integer
def generate_random_string(length=6):
charset = string.ascii_letters + string.digits
random_chars = random.choices(charset, k=length)
random_string = "".join(random_chars)
return random_string.capitalize()
class User(Record):
name = String()
age = Integer
UserAvroSchema = AvroSchema(User) # type: ignore
def gen_random_data():
return User(user=generate_random_string(), age=random.randint(0, 100))
class PulsarDemo(object):
def __init__(self) -> None:
self.SERVICE_URL = "pulsar://***"
self.TOPIC = "persistent://****"
client = Client(service_url=self.SERVICE_URL)
self.producer = client.create_producer(
topic=self.TOPIC,
schema=UserAvroSchema,
batching_enabled=True,
batching_max_messages=1000,
batching_max_publish_delay_ms=1000,
compression_type=CompressionType.SNAPPY, # type: ignore
)
def send_callback(self, send_result, msg_id):
print("Message published: result:{} msg_id:{}".format(send_result, msg_id))
def async_producer(self, cnt=1000):
while cnt >= 0:
data = gen_random_data()
self.producer.send_async(
data,
callback=self.send_callback,
)
time.sleep(0.01)
cnt -= 1
self.producer.flush()
# celery task
from celery import shared_task
@shared_task
def mock_data2pulsar(cnt=1000):
mock = PulsarDemo()
mock.async_producer()
an exception occurred at:
[2023-08-29 10:27:46,933: ERROR/ForkPoolWorker-31] Pulsar error: TopicNotFound
I'm new to celery. Could you explain how to run the script you provided?
It cannot be fully reproduced, but the online environment has always been problematic. The test of a separate celery + pulsar code is normal. At present, there is no idea. The online environment reports the following errors:
p._producer = self._client.create_producer(topic, conf)
_pulsar.TopicNotFound: Pulsar error: TopicNotFound
0000-00-00 00:00:00.000 WARN [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 WARN [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 WARN [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [0000-00-00 00:00:00.000 INFO [
The online environment only has problems when calling celery.delay, and there will be no exception when calling the function directly,as follows
# ok
mock_data2pulsar(cnt=1000)
# exception
mock_data2pulsar.delay(cnt=1000)
debug pulsar/__init__.py print topic and conf before p._producer = self._client.create_producer(topic, conf) looks ok
Hi @dengshaochun. I tried to reproduce the bug but the code works. There are suspicions that the problem is in the name of topic.
Here my example: https://github.com/gromsterus/issues-sandbox/tree/main/pulsar-client-python-150
I use persistent://public/default/test-celery.
If you provide the complete code with celery initialization it will be easier to help 🤝