pulsar-client-python icon indicating copy to clipboard operation
pulsar-client-python copied to clipboard

[Bug] Python pulsar-client producer does not produce messages with gunicorn `--preload` flag

Open whisust opened this issue 2 years ago • 3 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

Pulsar brokers version 2.9 python 3.9 Tested on macOS and in production (Linux)

Minimal reproduce step

1. Python env setup

Create a venv with the following packages installed: requirements.txt

Flask==2.0.1
gevent==21.8.0
gunicorn==20.1.0
pulsar-client==2.9.2

2. Pulsar setup

Start a standalone pulsar broker compatible with a 2.9 client

~/dev/pulsar
> bin/pulsar standalone -nss

3. Python script

test_pulsar.py

import logging
import uuid
from datetime import datetime

from _pulsar import Result
from flask import Flask, make_response
from pulsar import Client, MessageId, schema

logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s')
logger = logging.getLogger(__name__)
logger.info('Loading API')

app = Flask(__name__)

client = Client('pulsar://127.0.0.1:6650', authentication=None)

producer = client.create_producer(
    'non-persistent://public/default/test-gunicorn',
    producer_name=f'my-producer-{uuid.uuid4()}',
    schema=schema.StringSchema()
)


def callback(res: Result, _msg_id: MessageId):
    logger.info(f'Callback result here! Event acknowledged by the broker.')


@app.post('/post-message')
def post_pulsar_message():
    logger.info('Calling producer.send_async now, in the next lines there should be the callback result')
    dt = datetime.now()
    producer.send_async(content=f'dt={dt.isoformat()}', callback=callback)
    logger.info('After producer.send_async, returning the http response')
    return '', 201


@app.get('/')
def healthcheck():
    logger.info('API Running fine')
    return make_response({'status': 'healthy'})


logger.info('API started')

4. Run configurations

In your virtual environment, run any of the following:

valid configuration

gunicorn test_pulsar:app --workers=2

broken configuration

gunicorn test_pulsar:app --workers=2 --preload

5. Call the producing endpoint

curl -X POST http://127.0.0.1:8000/post-message

What did you expect to see?

In the valid configuration , once the endpoint is called, the message is correctly produced and we can see the callback log line.

[INFO] [test_pulsar] Calling producer.send_async now, in the next lines there should be the callback result
[INFO] [test_pulsar] After producer.send_async, returning the http response
[INFO] [test_pulsar] Callback result here! Event acknowledged by the broker.

What did you see instead?

In the invalid configuration the producer line is executed but no message is produced

[INFO] [test_pulsar] Calling producer.send_async now, in the next lines there should be the callback result
[INFO] [test_pulsar] After producer.send_async, returning the http response
<nothing else>

Anything else?

I tried also with the producer.send version, but it blocks and times out.

I also made sure that the monkey-patching expected in gunicorn runtimes does not affect the issue (tried with and without).

It seems to be an issue with the execution context of the actual producer, being incompatible with a gunicorn runtime, at least with those params.

This is an issue because using the --preload param is a must-have in production, as it generates only one of each static value (producer, db pool, caches and so on) instead of one per worker, and not having it is expensive in the compute / memory / operations side.

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

whisust avatar Oct 16 '22 09:10 whisust

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Nov 16 '22 02:11 github-actions[bot]

Hi, @whisust. Gunicorn fork main process on worker init, because of this, your approach won't work :(

You could use Flask Extensions Pattern & Gunicorn Server Hooks together

# test_pulsar.py
import logging
import uuid
from datetime import datetime
from typing import Optional

from _pulsar import Result
from flask import Flask, make_response
from pulsar import Client, MessageId, Producer, schema


logging.basicConfig(
    level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s'
)
logger = logging.getLogger(__name__)
pulsar_logger = logging.getLogger('pulsar')


class PulsarExt:
    def __init__(self, url: str) -> None:
        self._url = url
        self._client: Optional[Client] = None
        self._g_producer: Optional[Producer] = None

    def init_app(self, app_: Flask) -> None:
        app_.extensions['pulsar'] = self

    @property
    def client(self) -> Client:
        assert self._client is not None, 'Call `connect()` first'
        return self._client

    @property
    def g_producer(self) -> Producer:
        assert self._g_producer is not None, 'Call `connect()` first'
        return self._g_producer

    def connect(self) -> None:
        self._client = Client(self._url, authentication=None, logger=pulsar_logger)
        self._g_producer = self._client.create_producer(
            'non-persistent://public/default/test-gunicorn',
            producer_name=f'my-producer-{uuid.uuid4()}',
            schema=schema.StringSchema(),
        )

    def close(self) -> None:
        if self._g_producer:
            self._g_producer.close()
        if self._client:
            self._client.close()

        self._g_producer = None
        self._client = None


app = Flask(__name__)
pulsar_ext = PulsarExt('pulsar://pulsar:6650')
pulsar_ext.init_app(app)


def init_app() -> None:
    pulsar_ext.connect()


def teardown_app() -> None:
    pulsar_ext.close()


@app.post('/post-message')
def post_pulsar_message():
    logger.info(
        'Calling producer.send_async now, '
        'in the next lines there should be the callback result'
    )
    dt = datetime.now()
    pulsar_ext.g_producer.send_async(content=f'dt={dt.isoformat()}', callback=callback)
    logger.info('After producer.send_async, returning the http response')
    return '', 201


def callback(res: Result, _msg_id: MessageId):
    logger.info(f'Callback result here! Event acknowledged by the broker.')


@app.get('/')
def healthcheck():
    logger.info('API Running fine')
    return make_response({'status': 'healthy'})


# gunicorn_conf.py
from case_fixed import init_app, teardown_app


def post_worker_init(worker):
    init_app()


def worker_int(worker):
    teardown_app()


def worker_abort(worker):
    teardown_app()

And start webserver with

gunicorn test_pulsar:app  --config gunicorn_conf.py --workers=2 --preload 

Now client and producer are created after the fork and init of a worker

gromsterus avatar Nov 30 '22 09:11 gromsterus

Thanks, I will try this next week and come back to you!

whisust avatar Dec 06 '22 17:12 whisust