pulsar-client-python
pulsar-client-python copied to clipboard
[Bug] Python pulsar-client producer does not produce messages with gunicorn `--preload` flag
Search before asking
- [X] I searched in the issues and found nothing similar.
Version
Pulsar brokers version 2.9 python 3.9 Tested on macOS and in production (Linux)
Minimal reproduce step
1. Python env setup
Create a venv with the following packages installed:
requirements.txt
Flask==2.0.1
gevent==21.8.0
gunicorn==20.1.0
pulsar-client==2.9.2
2. Pulsar setup
Start a standalone pulsar broker compatible with a 2.9 client
~/dev/pulsar
> bin/pulsar standalone -nss
3. Python script
test_pulsar.py
import logging
import uuid
from datetime import datetime
from _pulsar import Result
from flask import Flask, make_response
from pulsar import Client, MessageId, schema
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s')
logger = logging.getLogger(__name__)
logger.info('Loading API')
app = Flask(__name__)
client = Client('pulsar://127.0.0.1:6650', authentication=None)
producer = client.create_producer(
'non-persistent://public/default/test-gunicorn',
producer_name=f'my-producer-{uuid.uuid4()}',
schema=schema.StringSchema()
)
def callback(res: Result, _msg_id: MessageId):
logger.info(f'Callback result here! Event acknowledged by the broker.')
@app.post('/post-message')
def post_pulsar_message():
logger.info('Calling producer.send_async now, in the next lines there should be the callback result')
dt = datetime.now()
producer.send_async(content=f'dt={dt.isoformat()}', callback=callback)
logger.info('After producer.send_async, returning the http response')
return '', 201
@app.get('/')
def healthcheck():
logger.info('API Running fine')
return make_response({'status': 'healthy'})
logger.info('API started')
4. Run configurations
In your virtual environment, run any of the following:
valid configuration
gunicorn test_pulsar:app --workers=2
broken configuration
gunicorn test_pulsar:app --workers=2 --preload
5. Call the producing endpoint
curl -X POST http://127.0.0.1:8000/post-message
What did you expect to see?
In the valid configuration , once the endpoint is called, the message is correctly produced and we can see the callback log line.
[INFO] [test_pulsar] Calling producer.send_async now, in the next lines there should be the callback result
[INFO] [test_pulsar] After producer.send_async, returning the http response
[INFO] [test_pulsar] Callback result here! Event acknowledged by the broker.
What did you see instead?
In the invalid configuration the producer line is executed but no message is produced
[INFO] [test_pulsar] Calling producer.send_async now, in the next lines there should be the callback result
[INFO] [test_pulsar] After producer.send_async, returning the http response
<nothing else>
Anything else?
I tried also with the producer.send
version, but it blocks and times out.
I also made sure that the monkey-patching expected in gunicorn runtimes does not affect the issue (tried with and without).
It seems to be an issue with the execution context of the actual producer, being incompatible with a gunicorn runtime, at least with those params.
This is an issue because using the --preload
param is a must-have in production, as it generates only one of each static value (producer, db pool, caches and so on) instead of one per worker, and not having it is expensive in the compute / memory / operations side.
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
The issue had no activity for 30 days, mark with Stale label.
Hi, @whisust. Gunicorn fork main process on worker init, because of this, your approach won't work :(
You could use Flask Extensions Pattern & Gunicorn Server Hooks together
# test_pulsar.py
import logging
import uuid
from datetime import datetime
from typing import Optional
from _pulsar import Result
from flask import Flask, make_response
from pulsar import Client, MessageId, Producer, schema
logging.basicConfig(
level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s'
)
logger = logging.getLogger(__name__)
pulsar_logger = logging.getLogger('pulsar')
class PulsarExt:
def __init__(self, url: str) -> None:
self._url = url
self._client: Optional[Client] = None
self._g_producer: Optional[Producer] = None
def init_app(self, app_: Flask) -> None:
app_.extensions['pulsar'] = self
@property
def client(self) -> Client:
assert self._client is not None, 'Call `connect()` first'
return self._client
@property
def g_producer(self) -> Producer:
assert self._g_producer is not None, 'Call `connect()` first'
return self._g_producer
def connect(self) -> None:
self._client = Client(self._url, authentication=None, logger=pulsar_logger)
self._g_producer = self._client.create_producer(
'non-persistent://public/default/test-gunicorn',
producer_name=f'my-producer-{uuid.uuid4()}',
schema=schema.StringSchema(),
)
def close(self) -> None:
if self._g_producer:
self._g_producer.close()
if self._client:
self._client.close()
self._g_producer = None
self._client = None
app = Flask(__name__)
pulsar_ext = PulsarExt('pulsar://pulsar:6650')
pulsar_ext.init_app(app)
def init_app() -> None:
pulsar_ext.connect()
def teardown_app() -> None:
pulsar_ext.close()
@app.post('/post-message')
def post_pulsar_message():
logger.info(
'Calling producer.send_async now, '
'in the next lines there should be the callback result'
)
dt = datetime.now()
pulsar_ext.g_producer.send_async(content=f'dt={dt.isoformat()}', callback=callback)
logger.info('After producer.send_async, returning the http response')
return '', 201
def callback(res: Result, _msg_id: MessageId):
logger.info(f'Callback result here! Event acknowledged by the broker.')
@app.get('/')
def healthcheck():
logger.info('API Running fine')
return make_response({'status': 'healthy'})
# gunicorn_conf.py
from case_fixed import init_app, teardown_app
def post_worker_init(worker):
init_app()
def worker_int(worker):
teardown_app()
def worker_abort(worker):
teardown_app()
And start webserver with
gunicorn test_pulsar:app --config gunicorn_conf.py --workers=2 --preload
Now client and producer are created after the fork and init of a worker
Thanks, I will try this next week and come back to you!