confluent-kafka-python
confluent-kafka-python copied to clipboard
AdminClient doesn't support specifying the logger
Description
Admin client from the library doesn't support specification of a logger. It generates the error:
TypeError: __init__() got an unexpected keyword argument 'logger
However, the producer and consumer clients do. Is there something I am missing, or is there an alternative to specify the logger in the admin client?
How to reproduce
import logging
import sys
from confluent_kafka.admin import AdminClient
logger = logging.getLogger("kafka_admin")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.formatter = JsonFormatter("%(message)s")
logger.addHandler(handler)
logger.propagate = False
admin_client = AdminClient(config, logger=logger)
Checklist
Please provide the following information:
- confluent_kafka.version() is ('2.2.0', 33685504)
- confluent_kafka.libversion() is ('2.2.0', 33685759)
- OS = ubuntu
You can use "logger" property inside config.
import logging
import sys
from confluent_kafka.admin import AdminClient
logger = logging.getLogger("kafka_admin")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.formatter = JsonFormatter("%(message)s")
logger.addHandler(handler)
logger.propagate = False
config["logger"] = logger
admin_client = AdminClient(config)
Will the AdminClient code recognize this logger passed as such?
@pranavrth Can you comment?
It should work in the way I have mentioned. Is it not working?
@watpp did @pranavrth's example solve your issue?
I've been unable to get this to work as well.
I've got kafka running locally but have it advertising a domain that doesn't exist so it will produce an error when I try to use describe_cluster
. Using the following test code (w/o setting logger
):
import logging
from confluent_kafka.admin import AdminClient
log = logging.getLogger("test")
log.addHandler(logging.FileHandler("test_log.log"))
log.setLevel("INFO")
if __name__ == "__main__":
config = {
"bootstrap.servers": "127.0.0.1:9092",
}
log.info("Creating Client")
client = AdminClient(config)
future = client.describe_cluster(request_timeout=5)
future.result()
I get the following:
# stdout/stderr
%3|1710088788.195|FAIL|rdkafka#producer-1| [thrd:kafka:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT)
%3|1710088789.203|FAIL|rdkafka#producer-1| [thrd:kafka:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT, 1 identical error(s) suppressed)
# test_log.log
Creating Client
Then when I add the logger into the config:
config = {
"bootstrap.servers": "127.0.0.1:9092",
"logger": log,
}
I no longer get anything printed to the screen, but the errors are also not written to test_log.log
. I've tried using the logging.StreamHandler(sys.stdout)
as the handler, but only my logs are printed to the screen - the kafka errors don't appear.
There is some issue for sure. I am marking it as a bug to further look into it.
What happens here is that the log operations are sent to the clients main queue which is served with poll
in general. This happens even in Consumer and Producer. In those APIs there are other important callbacks which are served in the poll
like rebalance_cb
in Consumer and delivery_cb
in Producer so we must call poll
there and even explained in the examples.
For admin client, poll
is not expected as it mainly serves only the log callback. For admin client to use the provided logger, please use admin_client.poll
when those logs needs to be served through the provided logger.
This is by design right now. We are thinking of improving this in future where the logs will be served though the background thread.
TL;DR: Use admin_client.poll
to serve the logs with the custom logger.
- I am updating the example as well to reflect the same.
- Please use
logger
config property instead of usinglogger
as an argument to theAdminClient
for now. I am addinglogger
as an argument to theAdminClient
as well.
PR for the above changes - https://github.com/confluentinc/confluent-kafka-python/pull/1758
Closing this issue as the working is provided in the example.
Also, fixed an issue where AdminClient was not accepting logger as an argument.