confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

AdminClient doesn't support specifying the logger

Open watpp opened this issue 1 year ago • 10 comments

Description

Admin client from the library doesn't support specification of a logger. It generates the error:

TypeError: __init__() got an unexpected keyword argument 'logger However, the producer and consumer clients do. Is there something I am missing, or is there an alternative to specify the logger in the admin client?

How to reproduce

import logging
import sys
from confluent_kafka.admin import AdminClient

logger = logging.getLogger("kafka_admin")
logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.formatter = JsonFormatter("%(message)s")

logger.addHandler(handler)
logger.propagate = False
admin_client = AdminClient(config, logger=logger)

Checklist

Please provide the following information:

  • confluent_kafka.version() is ('2.2.0', 33685504)
  • confluent_kafka.libversion() is ('2.2.0', 33685759)
  • OS = ubuntu

watpp avatar Jan 02 '24 09:01 watpp

You can use "logger" property inside config.

import logging
import sys
from confluent_kafka.admin import AdminClient

logger = logging.getLogger("kafka_admin")
logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.formatter = JsonFormatter("%(message)s")

logger.addHandler(handler)
logger.propagate = False
config["logger"] = logger
admin_client = AdminClient(config)

pranavrth avatar Jan 02 '24 09:01 pranavrth

Will the AdminClient code recognize this logger passed as such?

watpp avatar Jan 02 '24 13:01 watpp

@pranavrth Can you comment?

watpp avatar Jan 15 '24 12:01 watpp

It should work in the way I have mentioned. Is it not working?

pranavrth avatar Jan 16 '24 09:01 pranavrth

@watpp did @pranavrth's example solve your issue?

nhaq-confluent avatar Feb 11 '24 23:02 nhaq-confluent

I've been unable to get this to work as well.

I've got kafka running locally but have it advertising a domain that doesn't exist so it will produce an error when I try to use describe_cluster. Using the following test code (w/o setting logger):

import logging

from confluent_kafka.admin import AdminClient

log = logging.getLogger("test")
log.addHandler(logging.FileHandler("test_log.log"))
log.setLevel("INFO")


if __name__ == "__main__":
    config = {
        "bootstrap.servers": "127.0.0.1:9092",
    }
    log.info("Creating Client")
    client = AdminClient(config)

    future = client.describe_cluster(request_timeout=5)
    future.result()

I get the following:

# stdout/stderr
%3|1710088788.195|FAIL|rdkafka#producer-1| [thrd:kafka:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT)
%3|1710088789.203|FAIL|rdkafka#producer-1| [thrd:kafka:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT, 1 identical error(s) suppressed)

# test_log.log
Creating Client

Then when I add the logger into the config:

    config = {
        "bootstrap.servers": "127.0.0.1:9092",
        "logger": log,
    }

I no longer get anything printed to the screen, but the errors are also not written to test_log.log. I've tried using the logging.StreamHandler(sys.stdout) as the handler, but only my logs are printed to the screen - the kafka errors don't appear.

geoff-va avatar Mar 10 '24 16:03 geoff-va

There is some issue for sure. I am marking it as a bug to further look into it.

pranavrth avatar Mar 12 '24 16:03 pranavrth

What happens here is that the log operations are sent to the clients main queue which is served with poll in general. This happens even in Consumer and Producer. In those APIs there are other important callbacks which are served in the poll like rebalance_cb in Consumer and delivery_cb in Producer so we must call poll there and even explained in the examples.

For admin client, poll is not expected as it mainly serves only the log callback. For admin client to use the provided logger, please use admin_client.poll when those logs needs to be served through the provided logger.

This is by design right now. We are thinking of improving this in future where the logs will be served though the background thread.

TL;DR: Use admin_client.poll to serve the logs with the custom logger.

pranavrth avatar May 31 '24 10:05 pranavrth

  1. I am updating the example as well to reflect the same.
  2. Please use logger config property instead of using logger as an argument to the AdminClient for now. I am adding logger as an argument to the AdminClient as well.

pranavrth avatar May 31 '24 12:05 pranavrth

PR for the above changes - https://github.com/confluentinc/confluent-kafka-python/pull/1758

pranavrth avatar Jun 03 '24 11:06 pranavrth

Closing this issue as the working is provided in the example.

Also, fixed an issue where AdminClient was not accepting logger as an argument.

pranavrth avatar Jul 08 '24 09:07 pranavrth