confluent-kafka-python
confluent-kafka-python copied to clipboard
AdminClient doesn't support specifying the logger
Description
Admin client from the library doesn't support specification of a logger. It generates the error:
TypeError: __init__() got an unexpected keyword argument 'logger
However, the producer and consumer clients do. Is there something I am missing, or is there an alternative to specify the logger in the admin client?
How to reproduce
import logging
import sys
from confluent_kafka.admin import AdminClient
logger = logging.getLogger("kafka_admin")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.formatter = JsonFormatter("%(message)s")
logger.addHandler(handler)
logger.propagate = False
admin_client = AdminClient(config, logger=logger)
Checklist
Please provide the following information:
- confluent_kafka.version() is ('2.2.0', 33685504)
- confluent_kafka.libversion() is ('2.2.0', 33685759)
- OS = ubuntu
You can use "logger" property inside config.
import logging
import sys
from confluent_kafka.admin import AdminClient
logger = logging.getLogger("kafka_admin")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.formatter = JsonFormatter("%(message)s")
logger.addHandler(handler)
logger.propagate = False
config["logger"] = logger
admin_client = AdminClient(config)
Will the AdminClient code recognize this logger passed as such?
@pranavrth Can you comment?
It should work in the way I have mentioned. Is it not working?
@watpp did @pranavrth's example solve your issue?
I've been unable to get this to work as well.
I've got kafka running locally but have it advertising a domain that doesn't exist so it will produce an error when I try to use describe_cluster. Using the following test code (w/o setting logger):
import logging
from confluent_kafka.admin import AdminClient
log = logging.getLogger("test")
log.addHandler(logging.FileHandler("test_log.log"))
log.setLevel("INFO")
if __name__ == "__main__":
config = {
"bootstrap.servers": "127.0.0.1:9092",
}
log.info("Creating Client")
client = AdminClient(config)
future = client.describe_cluster(request_timeout=5)
future.result()
I get the following:
# stdout/stderr
%3|1710088788.195|FAIL|rdkafka#producer-1| [thrd:kafka:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT)
%3|1710088789.203|FAIL|rdkafka#producer-1| [thrd:kafka:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT, 1 identical error(s) suppressed)
# test_log.log
Creating Client
Then when I add the logger into the config:
config = {
"bootstrap.servers": "127.0.0.1:9092",
"logger": log,
}
I no longer get anything printed to the screen, but the errors are also not written to test_log.log. I've tried using the logging.StreamHandler(sys.stdout) as the handler, but only my logs are printed to the screen - the kafka errors don't appear.
There is some issue for sure. I am marking it as a bug to further look into it.
What happens here is that the log operations are sent to the clients main queue which is served with poll in general. This happens even in Consumer and Producer. In those APIs there are other important callbacks which are served in the poll like rebalance_cb in Consumer and delivery_cb in Producer so we must call poll there and even explained in the examples.
For admin client, poll is not expected as it mainly serves only the log callback. For admin client to use the provided logger, please use admin_client.poll when those logs needs to be served through the provided logger.
This is by design right now. We are thinking of improving this in future where the logs will be served though the background thread.
TL;DR: Use admin_client.poll to serve the logs with the custom logger.
- I am updating the example as well to reflect the same.
- Please use
loggerconfig property instead of usingloggeras an argument to theAdminClientfor now. I am addingloggeras an argument to theAdminClientas well.
PR for the above changes - https://github.com/confluentinc/confluent-kafka-python/pull/1758
Closing this issue as the working is provided in the example.
Also, fixed an issue where AdminClient was not accepting logger as an argument.