kafka-python
kafka-python copied to clipboard
BUG: KafkaProducer leaves garbage
Hello,
I found that it is not safe to turn off garbage collection when using KafkaProducer
because it leaves behind garbage.
Steps to reproduce:
import gc
from kafka import KafkaProducer
import time
gc.disable()
producer = KafkaProducer(
bootstrap_servers=['some-server'],
acks=0,
api_version=(1, 0, 0)
)
message = "test_message"
encoded_message = message.encode("utf-8")
producer.send("some-topic", encoded_message)
# Give the producer a chance to do some work
time.sleep(1)
gc.set_debug(gc.DEBUG_SAVEALL)
gc.collect()
gc.set_debug(0)
print("Collected garbage")
if gc.garbage:
print("Found some garbage %d = %r" % (len(gc.garbage), gc.garbage))
Note, you cannot run this in a python shell as it will not leave any garbage.
Output:
Collected garbage
Found some garbage 34 = [_pytest.store.StoreKey[typing.Union[_pytest.config.Config, NoneType]], {'__module__': '_pytest.store', '__doc__': 'StoreKey is an object used as a key to a Store.\n\n A StoreKey is associated with the type T of the value of the key.\n\n A StoreKey is unique and cannot conflict with another key.\n ', '__slots__': (), '__origin__': _pytest.store.StoreKey, '__extra__': None, '_gorg': _pytest.store.StoreKey, '__abstractmethods__': frozenset(), '_abc_registry': <_weakrefset.WeakSet object at 0x7f2633a2c898>, '_abc_cache': <_weakrefset.WeakSet object at 0x7f2633a2c860>, '_abc_generic_negative_cache': <_weakrefset.WeakSet object at 0x7f2633a2c940>, '_abc_generic_negative_cache_version': 40, '__parameters__': (), '__args__': (typing.Union[_pytest.config.Config, NoneType],), '__next_in_mro__': <class 'object'>, '__orig_bases__': (typing.Generic[~T],), '__tree_hash__': -4243921850862713175}, (_pytest.store.StoreKey[typing.Union[_pytest.config.Config, NoneType]], _pytest.store.StoreKey, typing.Generic, <class 'object'>), (_pytest.store.StoreKey,), (typing.Union[_pytest.config.Config, NoneType],), typing.Dict[str, _ForwardRef('FixtureDef[Any]')], (typing.Dict[str, _ForwardRef('FixtureDef[Any]')], typing.Dict, <class 'dict'>, typing.MutableMapping, <class 'collections.abc.MutableMapping'>, typing.Mapping, <class 'collections.abc.Mapping'>, typing.Collection, <class 'collections.abc.Collection'>, <class 'collections.abc.Sized'>, typing.Iterable, <class 'collections.abc.Iterable'>, typing.Container, <class 'collections.abc.Container'>, typing.Generic, <class 'object'>), _pytest.store.StoreKey[typing.Dict[str, _ForwardRef('FixtureDef[Any]')]], (_pytest.store.StoreKey[typing.Dict[str, _ForwardRef('FixtureDef[Any]')]], _pytest.store.StoreKey, typing.Generic, <class 'object'>), {'__module__': 'typing', '__slots__': (), '__new__': <staticmethod object at 0x7f2638488278>, '__origin__': typing.Dict, '__extra__': <class 'dict'>, '_gorg': typing.Dict, '__doc__': None, '__abstractmethods__': frozenset(), '_abc_registry': <_weakrefset.WeakSet object at 0x7f263782f320>, '_abc_cache': <_weakrefset.WeakSet object at 0x7f263782f2b0>, '_abc_generic_negative_cache': <_weakrefset.WeakSet object at 0x7f263782f438>, '_abc_generic_negative_cache_version': 40, '__parameters__': (), '__args__': (<class 'str'>, _ForwardRef('FixtureDef[Any]')), '__next_in_mro__': <class 'object'>, '__orig_bases__': (<class 'dict'>, typing.MutableMapping[~KT, ~VT]), '__subclasshook__': <function _make_subclasshook.<locals>.__extrahook__ at 0x7f26339b9b70>, '__tree_hash__': -2573890543485547523}, (typing.Dict, <class 'dict'>, typing.MutableMapping), {'__module__': '_pytest.store', '__doc__': 'StoreKey is an object used as a key to a Store.\n\n A StoreKey is associated with the type T of the value of the key.\n\n A StoreKey is unique and cannot conflict with another key.\n ', '__slots__': (), '__origin__': _pytest.store.StoreKey, '__extra__': None, '_gorg': _pytest.store.StoreKey, '__abstractmethods__': frozenset(), '_abc_registry': <_weakrefset.WeakSet object at 0x7f2633a2c898>, '_abc_cache': <_weakrefset.WeakSet object at 0x7f2633a2c860>, '_abc_generic_negative_cache': <_weakrefset.WeakSet object at 0x7f2633a2c940>, '_abc_generic_negative_cache_version': 40, '__parameters__': (), '__args__': (typing.Dict[str, _ForwardRef('FixtureDef[Any]')],), '__next_in_mro__': <class 'object'>, '__orig_bases__': (typing.Generic[~T],), '__tree_hash__': -2080973868901861766}, (_pytest.store.StoreKey,), (<class 'str'>, _ForwardRef('FixtureDef[Any]')), <function _make_subclasshook.<locals>.__extrahook__ at 0x7f26339b9b70>, (typing.Dict[str, _ForwardRef('FixtureDef[Any]')],), _ForwardRef('FixtureDef[Any]'), (<cell at 0x7f26339b7168: GenericMeta object at 0x28289f8>,), <cell at 0x7f26339b7168: GenericMeta object at 0x28289f8>, <kafka.producer.future.FutureProduceResult object at 0x7f263315b0b8>, [<bound method FutureRecordMetadata._produce_success of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>], {'is_done': True, 'value': (-1, None, None), 'exception': None, '_callbacks': [<bound method FutureRecordMetadata._produce_success of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>], '_errbacks': [<bound method Future.failure of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>], 'topic_partition': TopicPartition(topic='some-topic', partition=0), '_latch': <gevent._gevent_cevent.Event object at 0x7f263340f348>}, [<bound method Future.failure of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>], <gevent._gevent_cevent.Event object at 0x7f263340f348>, [], <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>, [], {'is_done': True, 'value': RecordMetadata(topic='some-topic', partition=0, topic_partition=TopicPartition(topic='some-topic', partition=0), offset=-1, timestamp=1629906912582, log_start_offset=None, checksum=None, serialized_key_size=-1, serialized_value_size=12, serialized_header_size=-1), 'exception': None, '_callbacks': [], '_errbacks': [], '_produce_future': <kafka.producer.future.FutureProduceResult object at 0x7f263315b0b8>, 'args': (0, 1629906912582, None, -1, 12, -1)}, [], (0, 1629906912582, None, -1, 12, -1), <bound method FutureRecordMetadata._produce_success of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>, <bound method Future.failure of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>, (-1, None, None), RecordMetadata(topic='some-topic', partition=0, topic_partition=TopicPartition(topic='some-topic', partition=0), offset=-1, timestamp=1629906912582, log_start_offset=None, checksum=None, serialized_key_size=-1, serialized_value_size=12, serialized_header_size=-1)]
A lot of the garbage seems to come from kafka.producer.future.FutureRecordMetadata
and kafka.producer.future.FutureProduceResult
.
Are there reference loops somewhere?