pulsar-client-python
pulsar-client-python copied to clipboard
[2.8.0] Python client instances emit logs for every client instance ever constructed with a `logger` argument
Describe the bug
When Client
instances are constructed with the Python driver, they seem to accumulate the value of the logger
argument as global state. This means two things:
- If a Client is constructed without a
logger
, all subsequent clients constructed with alogger
will not use it. - If a Client is constructed with a logger, and another client is constructed with a different logger, the second client will emit all logs twice: once to the first client's logger, and once to its own logger.
These behaviors both occur whether or not previously-constructed clients still exist--even if previous clients have been disconnected and garbage collected, issues still occur.
To Reproduce
- Ensure a broker is running (I tested against 2.8.0) on
localhost:6650
. - Run this Python snippet:
import logging
from pulsar import Client
def logger_with_prefix(prefix):
logger = logging.getLogger('test')
ch = logging.StreamHandler()
formatter = logging.Formatter('{}: %(message)s'.format(prefix))
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
if __name__ == '__main__':
print("Creating first")
first = Client(service_url='pulsar://localhost:6650/')
print("Destroying first")
del first
print("Creating second")
second = Client(
service_url='pulsar://localhost:6650/',
logger=logger_with_prefix("FOO"),
)
consumer = second.subscribe('sometopic', 'somesub')
consumer.close()
- Observe that the Python logger is not used when logging consumer creation/destruction (sample output below).
- Run this Python snippet:
import logging
from pulsar import Client
def logger_with_prefix(prefix):
logger = logging.getLogger('test')
ch = logging.StreamHandler()
formatter = logging.Formatter('{}: %(message)s'.format(prefix))
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
if __name__ == '__main__':
print("Creating first")
first = Client(
service_url='pulsar://localhost:6650/',
logger = logger_with_prefix("FOO"),
)
print("Destroying first")
del first
print("Creating second")
second = Client(
service_url='pulsar://localhost:6650/',
logger=logger_with_prefix("BAR"),
)
consumer = second.subscribe('sometopic', 'somesub')
consumer.close()
- Observe that logs for the consumer operations are emitted twice, once on the
FOO
logger and once on theBAR
logger. - Run this python snippet:
import logging
from pulsar import Client
def logger_with_prefix(prefix):
logger = logging.getLogger('test')
ch = logging.StreamHandler()
formatter = logging.Formatter('{}: %(message)s'.format(prefix))
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
if __name__ == '__main__':
print("Creating first")
first = Client(
service_url='pulsar://localhost:6650/',
logger = logger_with_prefix("FOO"),
)
print("Destroying first")
del first
print("Creating second")
second = Client(
service_url='pulsar://localhost:6650/',
)
consumer = second.subscribe('sometopic', 'somesub')
consumer.close()
- Observe that logs are emitted with the
FOO
prefix.
Expected behavior
- In the first snippet, all logs should be emitted with the
FOO
prefix. - In the second snippet, all logs should be emitted with the
BAR
prefix. - In the third snippet, all logs should be emitted with no prefix/using the internal log4cxx logger.
Desktop (please complete the following information):
- OS: MacOS 10.11
Erroneous output of snippet 1:
∴ python tests/benchmark/scratch.py
Creating first
Destroying first
Creating second
2021-08-30 16:44:35.805 INFO [0x1178f2e00] Client:88 | Subscribing on Topic :sometopic
2021-08-30 16:44:35.806 INFO [0x1178f2e00] ConnectionPool:84 | Created connection for pulsar://localhost:6650/
2021-08-30 16:44:35.808 INFO [0x70000c911000] ClientConnection:372 | [127.0.0.1:57417 -> 127.0.0.1:6650] Connected to broker
2021-08-30 16:44:35.821 INFO [0x70000c911000] HandlerBase:55 | [persistent://public/default/sometopic, somesub, 0] Getting connection from pool
2021-08-30 16:44:35.822 INFO [0x70000c911000] ConnectionPool:84 | Created connection for pulsar://localhost:6650
2021-08-30 16:44:35.823 INFO [0x70000c911000] ClientConnection:374 | [127.0.0.1:57418 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650
2021-08-30 16:44:35.839 INFO [0x70000c911000] ConsumerImpl:220 | [persistent://public/default/sometopic, somesub, 0] Created consumer on broker [127.0.0.1:57418 -> 127.0.0.1:6650]
2021-08-30 16:44:35.839 INFO [0x1178f2e00] ConsumerImpl:874 | [persistent://public/default/sometopic, somesub, 0] Closing consumer for topic persistent://public/default/sometopic
2021-08-30 16:44:35.840 INFO [0x70000c911000] ConsumerImpl:930 | [persistent://public/default/sometopic, somesub, 0] Closed consumer 0
2021-08-30 16:44:35.848 INFO [0x1178f2e00] ClientConnection:1446 | [127.0.0.1:57418 -> 127.0.0.1:6650] Connection closed
2021-08-30 16:44:35.848 ERROR [0x70000c911000] ClientConnection:531 | [127.0.0.1:57418 -> 127.0.0.1:6650] Read failed: Operation canceled
2021-08-30 16:44:35.849 INFO [0x1178f2e00] ClientConnection:261 | [127.0.0.1:57418 -> 127.0.0.1:6650] Destroyed connection
2021-08-30 16:44:35.849 ERROR [0x70000c911000] ClientConnection:531 | [127.0.0.1:57417 -> 127.0.0.1:6650] Read failed: Operation canceled
2021-08-30 16:44:35.849 INFO [0x1178f2e00] ClientConnection:1446 | [127.0.0.1:57417 -> 127.0.0.1:6650] Connection closed
2021-08-30 16:44:35.849 INFO [0x1178f2e00] ClientConnection:261 | [127.0.0.1:57417 -> 127.0.0.1:6650] Destroyed connection
Erroneous output of snippet 2:
import logging
from pulsar import Client
def logger_with_prefix(prefix):
logger = logging.getLogger('test')
ch = logging.StreamHandler()
formatter = logging.Formatter('{}: %(message)s'.format(prefix))
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
if __name__ == '__main__':
print("Creating first")
first = Client(
service_url='pulsar://localhost:6650/',
logger = logger_with_prefix("FOO"),
)
print("Destroying first")
del first
print("Creating second")
second = Client(
service_url='pulsar://localhost:6650/',
logger=logger_with_prefix("BAR"),
)
consumer = second.subscribe('sometopic', 'somesub')
consumer.close()
Erroneous output of snippet 3:
∴ python tests/benchmark/scratch.py
Creating first
Destroying first
Creating second
FOO: Subscribing on Topic :sometopic
FOO: Created connection for pulsar://localhost:6650/
FOO: [127.0.0.1:57427 -> 127.0.0.1:6650] Connected to broker
FOO: [persistent://public/default/sometopic, somesub, 0] Getting connection from pool
FOO: Created connection for pulsar://localhost:6650
FOO: [127.0.0.1:57428 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650
FOO: [persistent://public/default/sometopic, somesub, 0] Created consumer on broker [127.0.0.1:57428 -> 127.0.0.1:6650]
FOO: [persistent://public/default/sometopic, somesub, 0] Closing consumer for topic persistent://public/default/sometopic
FOO: [persistent://public/default/sometopic, somesub, 0] Closed consumer 0
FOO: [127.0.0.1:57428 -> 127.0.0.1:6650] Connection closed
FOO: [127.0.0.1:57427 -> 127.0.0.1:6650] Connection closed
Could someone please point me at what code I should look at if I wanted to put up a PR to fix this? It's quite a hassle for environments with more than one Pulsar cluster (and thus more than one client). I'm happy to do the work, but am not familiar with C++ enough to find out the code responsible here--I think it has something to do with constexpr
but am not positive.
In C++ client, the logger factory is a global object and can only be initialized only once in the constructor of Client
.
https://github.com/apache/pulsar/blob/9a23dd0eb5557f3d8715437f31f43c9875729d94/pulsar-client-cpp/lib/ClientImpl.cc#L118
https://github.com/apache/pulsar/blob/9a23dd0eb5557f3d8715437f31f43c9875729d94/pulsar-client-cpp/lib/LogUtils.cc#L41-L49
The LogUtils::setLoggerFactory
method ensures that the logger factory can only be initialized once.
I think it's better not to change the current behavior. Each C++ source file uses DECLARE_LOG_OBJECT
macro to get the
thread local logger from the factory.
https://github.com/apache/pulsar/blob/9a23dd0eb5557f3d8715437f31f43c9875729d94/pulsar-client-cpp/lib/LogUtils.h#L37-L47
Once a logger of a thread is created, even if the global logger factory was reset, the logger won't be affected.
The issue had no activity for 30 days, mark with Stale label.
@BewareMyPower that's pretty unusual and bad behavior at least in the Python ecosystem. Client objects aren't singletons; they shouldn't store state once discarded. Most production code that uses clients for anything (databases, brokers, you name it) supports the total destruction and then reconnection/reconfiguration of clients.
That includes logging behavior; especially because there are some really weird interactions between Python clients with the logger set and the Python runtime itself (https://github.com/apache/pulsar/issues/16527), I think it's important to make sure that when a Client
object is garbage collected in Python, all state is reset to what it was before that Client
was created. That includes:
- Logger settings (i.e. no global singletons in c++; everything should be scoped--the Pulsar client shouldn't act like a Boost application with const/static loggers; it should act like a boost library that can be loaded/unloaded/reset).
- Threads--none should leak.
- Filehandles.
In general, the client shouldn't use process-global state; just instance-global state. Instances are not necesarily long-lived or singletons.
For example, our production use case maintains (and periodically disconnects/reconnects) a handful of clients to different brokers, each with different logging settings.
Yes. I think it's just because all Pulsar C++ client's source code share the same logger factory. But the ClientConfiguration
has a logger factory config.
I think we should add an independent logger config to C++ client that manages the logger of the whole library, it should not be configured on a Client
. And we should make it clear which classes can be affected by changing the Client
's logger.
I think that still may result in issues, though. What if the library-global logger factory is configured to wrap a logger object whose lifecycle is not bound to the library--like a Python logger, which may need to be mutated, created, destroyed etc., between constructions of Client
? Wouldn't that result in issues like this one?
In other words, I think a library-global logger is not a good idea so long as we support passing in logger objects from Python. I think the two are mutually exclusive (if you don't want bugs/weird behavior that is).
IMO, the library-global logger is more for safety. Currently C++ client uses a thread local logger for all source files of the library so that the user-provided Logger implementation doesn't need to be thread-safe. If a logger binds to a specific class instance like Client
, the logger could be used by multiple threads, so it has to be thread-safe.
Do you have any better idea? @merlimat
To double down, I think the statement @merlimat made here:
you're expected to keep 1 single instance of C++ Pulsar client for the duration of your application
is not going to be true in a lot of the Python world. In Python, the pulsar client is one library of many in potentially long-lived applications; it's not necessarily part of a global, nor is it expected (or desired) for it to manage a global pool of connection state. Python programs can and will attempt to briefly use pulsar clients and then fully dispose of them, potentially doing this many times throughout their life.
@BewareMyPower re: thread safety, some thoughts:
- Splitting loggers into threadlocal versions for each file is no problem, so long as those threadlocal instances (and the threads they're in) have lifetimes tied to the client object that created them. So long as we properly join those threads on client destruction and never daemonize (I think it's "detach" in Boost?) them, their lifetimes should be something that can be linked to the parent client object.
- Do we need true nonblocking thread safety for those loggers? Could they be protected with a lock instead, so long as we didn't acquire that lock until as late (I.e. we knew we needed to emit a log message at a given level--I assume application-side adjustment of the level at runtime can be done atomically in C++) as possible? My hunch is that this would be OK performance-wise, and that most loggers are writing to the same stream/file anyway, so there's already a lock around their internal "write" actions to prevent the output data from getting mangled, so moving the lock "up" into application code wouldn't harm much if anything.
Moved to the new dedicated client repo.