Pass extra args and kwargs to confluent-kafka Consumer
Description
This PR fixes an issue in AutoInstrumentedConsumer which wasn't passing extra args and kwargs to superclass Consumer. This was causing issues when a caller need to pass extra arguments like logger.
The use of logger is tested here: https://github.com/confluentinc/confluent-kafka-python/blob/master/tests/test_log.py#L28-L30
Type of change
- [x] Bug fix (non-breaking change which fixes an issue)
How Has This Been Tested?
To reproduce the issue we instantiate a Consumer object with extra arguments (e.g. logger) before and after instrumenting by ConfluentKafkaInstrumentor().instrument()
Instantiating the uninstrumented Consumer:
In [1]: from confluent_kafka import Consumer
In [2]: conf2 = {'bootstrap.servers': "localhost:9092", 'group.id': "foo", 'auto.offset.reset': 'smallest'}
In [3]: Consumer(conf2, logger=55)
Out[3]: <cimpl.Consumer at 0x7fa3462567c0>
We can pass keyword arguments. Below is the same after we instrument Consumer:
In [1]: from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
In [2]: ConfluentKafkaInstrumentor().instrument()
In [3]: from confluent_kafka import Consumer
In [4]: conf2 = {'bootstrap.servers': "localhost:9092", 'group.id': "foo", 'auto.offset.reset': 'smallest'}
In [5]: Consumer(conf2, logger=55)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[5], line 1
----> 1 Consumer(conf2, logger=55)
TypeError: __init__() got an unexpected keyword argument 'logger'
Does This PR Require a Core Repo Change?
- [ ] Yes. - Link to PR:
- [x] No.
Checklist:
See contributing.md for styleguide, changelog guidelines, and more.
- [x] Followed the style guidelines of this project
The committers listed above are authorized under a signed CLA.
- :white_check_mark: login: xrmx / name: Riccardo Magliocchetti (d9f9fd09f005085b0fbb0466f60fae9a976fc9c9)
From https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/cimpl.pyi#L168C7-L168C15, that does not match their definition though:
class Consumer:
def __init__(self, config: Dict[str, Union[str, int, float, bool, None]]) -> None: ...
OTOH, the C implementation: https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/src/Consumer.c#L1783
static int Consumer_init(PyObject *selfobj, PyObject *args, PyObject *kwargs) {