confluent-kafka-python
confluent-kafka-python copied to clipboard
Segmentation faults (and the like) and failure to get assignments in multithreaded/asyncio pytest environment
Description
I have a pytest suite that will:
- Create a kafka testcontainer as a module fixture
- Create a bunch of topics as a module fixture using the admin client
- For each test case: i. Create one Consumer object per topic in a threadpool, all with auto.offset.reset=earliest ii. Subscribe to them in a threadpool. iii. Wait for each Consumer to receive an assignment. iv. In a threadpool, create a bunch of Producers, send messages to the topics, and flush. v. Wait for each Consumer to process the correct number of messages. vi. Finally, close all the consumers, regardless of exceptions raised elsewhere.
This test suite is notoriously prone to segmentation faults and the like that crash the entire interpreter and are very disruptive.
I have heard the confluent_kafka is thread safe, but I have not experienced that to be the case. And I'm open to the possibility that this is user error on my part. If so, please, show me the way.
The errors tend to happen after a first test case has run and during the second test case where the Consumer is attempting to monitor for assignments.
There are many different presentations:
INTERNAL ERROR: librdkafka rd_kafka_poll_cb:4113: Can't handle op type XMIT_BUF (0x8)
Assertion failed: (!*"INTERNAL ERROR IN LIBRDKAFKA"), function rd_kafka_poll_cb, file rdkafka.c, line 4113.
Fatal Python error: Aborted
INTERNAL ERROR: librdkafka rd_kafka_poll_cb:4113: Can't handle op type NODE_UPDATE (0x7)
Assertion failed: (!*"INTERNAL ERROR IN LIBRDKAFKA"), function rd_kafka_poll_cb, file rdkafka.c, line 4113.
INTERNAL ERROR: librdkafka rd_kafka_poll_cb:4113: Can't handle op type CONNECT (0x35)
Assertion failed: (!*"INTERNAL ERROR IN LIBRDKAFKA"), function rd_kafka_poll_cb, file rdkafka.c, line 4113.
INTERNAL ERROR: librdkafka rd_kafka_poll_cb:4113: Can't handle op type REPLY:GET_REBALANCE_PROTOCOL (0x4000003a)
Assertion failed: (!*"INTERNAL ERROR IN LIBRDKAFKA"), function rd_kafka_poll_cb, file rdkafka.c, line 4113.
segmentation fault
INTERNAL ERROR: librdkafka rd_kafka_poll_cb:4113: Can't handle op type REPLY:NODE_UPDATE (0x40000007)
Assertion failed: (!*"INTERNAL ERROR IN LIBRDKAFKA"), function rd_kafka_poll_cb, file rdkafka.c, line 4113.
Additionally, if the timeout argument to poll() is set, the consumer never appears to get an assignment at all.
How to reproduce
Here's a pretty elaborate script that reliably reproduces the total range of errors that I see with a lot of different knobs to tweak.
import argparse
import asyncio
import logging
import os
import sys
import threading
import time
from asyncio import CancelledError, Task
from contextlib import ExitStack
from functools import partial
import anyio.to_thread
from confluent_kafka import Consumer, KafkaError, KafkaException, Producer
from confluent_kafka.admin import AdminClient, NewTopic
logging.basicConfig(level=logging.INFO)
confluent_kafka_logger = logging.getLogger("confluent_kafka")
confluent_kafka_logger.setLevel(logging.INFO)
confluent_kafka_logger.addHandler(logging.StreamHandler())
CONFIG = {
"NUM_TOPICS": 1,
"NUM_MESSAGES": 1,
"NUM_EXPERIMENTS": sys.maxsize,
"TOPIC_PREFIX": "test-topic",
"MAX_NUM_THREADS": 200,
"KAFKA_BOOTSTRAP_SERVERS": None,
"LOCK_CONSUMER_OPERATIONS": False,
"POLL_TIMEOUT": None,
"CHECK_ASSIGNMENTS_TIMEOUT": 15,
"CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT": 15,
**os.environ,
}
SINGLETONS = {}
def recreate_topic(topic_name):
kafka_config = {
"bootstrap.servers": CONFIG["KAFKA_BOOTSTRAP_SERVERS"],
}
admin_client = AdminClient(kafka_config)
topics = admin_client.list_topics(timeout=10).topics
if topic_name in topics:
print(f"Deleting existing topic '{topic_name}'...")
fs = admin_client.delete_topics([topic_name], operation_timeout=30)
for topic, f in fs.items():
try:
f.result()
print(f"Topic '{topic}' successfully deleted.")
except KafkaException as e:
print(f"Failed to delete topic '{topic}': {e}")
while topic_name in admin_client.list_topics().topics:
time.sleep(1)
print(f"Confirmed that topic {topic_name} does not exist.")
print(f"Creating topic '{topic_name}'...")
new_topic = NewTopic(topic_name, num_partitions=3, replication_factor=1)
try:
admin_client.create_topics([new_topic], operation_timeout=30)
print(f"Topic '{topic_name}' created.")
except KafkaException as e:
print(f"Failed to create topic '{topic_name}': {e}")
# Loop until topic exists
while topic_name not in admin_client.list_topics().topics:
time.sleep(1)
print(f"Confirmed that topic {topic_name} exists.")
class LockingConsumer:
"""A patched version of the Consumer class that locks operations."""
def __init__(self, config: dict, logger):
self.config = config
self.consumer = Consumer(config, logger=logger)
self.lock = threading.Lock()
def __getattr__(self, item):
obj = getattr(self.consumer, item)
if callable(obj):
return self._wrap_callable(obj)
def _wrap_callable(self, func):
def wrapped(*args, **kwargs):
with self.lock:
return func(*args, **kwargs)
return wrapped
class AsyncConsumer:
def __init__(self, topic_name, num_messages):
self.topic_name = topic_name
self.num_messages = num_messages
consumer_config = {
"bootstrap.servers": CONFIG["KAFKA_BOOTSTRAP_SERVERS"],
"group.id": f"test_group.{topic_name}",
"auto.offset.reset": "latest",
"enable.auto.commit": "false",
}
if CONFIG["LOCK_CONSUMER_OPERATIONS"]:
consumer_class = LockingConsumer
else:
consumer_class = Consumer
self.consumer = consumer_class(consumer_config, logger=confluent_kafka_logger)
self.task: Task | None = None
def _log(self, msg):
print(f"Consumer({self.topic_name}): {msg}.")
async def assignments(self):
return await anyio.to_thread.run_sync(
self.consumer.assignment, limiter=SINGLETONS["limiter"]
)
async def start(self):
self._log("Subscribing.")
await anyio.to_thread.run_sync(
self.consumer.subscribe, [self.topic_name], limiter=SINGLETONS["limiter"]
)
try:
self.task = asyncio.create_task(self.poll_loop())
except CancelledError:
self._log("Timeout.")
except KafkaException as e:
self._log(f"{e}")
async def close(self):
if self.task is not None and not self.task.done():
self._log("Cancelling poll_loop().")
self.task.cancel()
else:
self._log("poll_loop() already done.")
self._log("Closing consumer")
await anyio.to_thread.run_sync(
self.consumer.close, limiter=SINGLETONS["limiter"]
)
self._log("Consumer closed")
async def poll_loop(self):
while True:
self._log("Polling.")
msg = await anyio.to_thread.run_sync(
(
self.consumer.poll
if CONFIG["POLL_TIMEOUT"] is None
else partial(self.consumer.poll, timeout=CONFIG["POLL_TIMEOUT"])
),
limiter=SINGLETONS["limiter"],
)
self._log(f"Message received: {msg.value()}")
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
await anyio.to_thread.run_sync(
self.consumer.commit, limiter=SINGLETONS["limiter"]
)
self.num_messages -= 1
self._log(f"{self.num_messages} messages left.")
async def add_messages(topic_name):
producer_config = {"bootstrap.servers": CONFIG["KAFKA_BOOTSTRAP_SERVERS"]}
producer = Producer(producer_config)
for i in range(CONFIG["NUM_MESSAGES"]):
print(f"Producer({topic_name}) - producing message {i}.")
await anyio.to_thread.run_sync(
producer.produce, topic_name, f"{i}", limiter=SINGLETONS["limiter"]
)
await anyio.to_thread.run_sync(producer.poll, 0, limiter=SINGLETONS["limiter"])
await anyio.to_thread.run_sync(producer.flush, limiter=SINGLETONS["limiter"])
async def check_assignments(consumer_handlers):
assignments = []
while not assignments:
handler_assignments = await asyncio.gather(
*[consumer_handler.assignments() for consumer_handler in consumer_handlers]
)
is_assigned = [len(assignment) > 0 for assignment in handler_assignments]
print(f"Assignments: {is_assigned}")
if sum(is_assigned) != len(consumer_handlers):
print(f"Only {sum(is_assigned)} handlers have assignments, sleeping")
await asyncio.sleep(1)
else:
print("All handlers have assignments.")
return
async def check_messages_are_consumed(consumer_handlers):
while any(
consumer_handler.num_messages > 0 for consumer_handler in consumer_handlers
):
await asyncio.sleep(0.1)
async def run_test_case(topic_names):
print("--- Starting consumers. ---")
consumer_handlers = [
AsyncConsumer(topic_name, CONFIG["NUM_MESSAGES"]) for topic_name in topic_names
]
await asyncio.gather(
*[consumer_handler.start() for consumer_handler in consumer_handlers]
)
try:
print("--- Waiting for assignments. ---")
await asyncio.wait_for(
check_assignments(consumer_handlers),
timeout=CONFIG["CHECK_ASSIGNMENTS_TIMEOUT"],
)
print("--- Adding messages. ---")
await asyncio.gather(*[add_messages(topic_name) for topic_name in topic_names])
print("--- Waiting for messages to be consumed. ---")
await asyncio.wait_for(
check_messages_are_consumed(consumer_handlers),
timeout=CONFIG["CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT"],
)
finally:
print("--- Closing consumers. ---")
await asyncio.gather(
*[consumer_handler.close() for consumer_handler in consumer_handlers]
)
print("--- Test case done. ---")
async def run_experiments(topic_names):
for i in range(CONFIG["NUM_EXPERIMENTS"]):
print(f"------ Running experiment {i} ------")
await run_test_case(topic_names)
async def create_topics():
tasks = []
topic_names = []
for i in range(CONFIG["NUM_TOPICS"]):
topic_name = f"{CONFIG['TOPIC_PREFIX']}-{i}"
topic_names.append(topic_name)
tasks.append(
anyio.to_thread.run_sync(
recreate_topic, topic_name, limiter=SINGLETONS["limiter"]
)
)
await asyncio.gather(*tasks)
return topic_names
async def run_test_application():
topic_names = await create_topics()
await run_experiments(topic_names)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--num-topics", type=int, default=CONFIG["NUM_TOPICS"])
parser.add_argument("--num-messages", type=int, default=CONFIG["NUM_MESSAGES"])
parser.add_argument(
"--num-experiments", type=int, default=CONFIG["NUM_EXPERIMENTS"]
)
parser.add_argument("--topic-prefix", type=str, default=CONFIG["TOPIC_PREFIX"])
parser.add_argument(
"--max-num-threads", type=int, default=CONFIG["MAX_NUM_THREADS"]
)
parser.add_argument("--lock-consumer-operations", action="store_true")
parser.add_argument("--poll-timeout", type=float, default=CONFIG["POLL_TIMEOUT"])
parser.add_argument(
"--check-assignments-timeout",
type=float,
default=CONFIG["CHECK_ASSIGNMENTS_TIMEOUT"],
)
parser.add_argument(
"--check-messages-are-consumed-timeout",
type=float,
default=CONFIG["CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT"],
)
parser.add_argument(
"--kafka-bootstrap-servers", type=str, default=CONFIG["KAFKA_BOOTSTRAP_SERVERS"]
)
args = parser.parse_args()
CONFIG["NUM_TOPICS"] = args.num_topics
CONFIG["NUM_MESSAGES"] = args.num_messages
CONFIG["NUM_EXPERIMENTS"] = args.num_experiments
CONFIG["TOPIC_PREFIX"] = args.topic_prefix
CONFIG["KAFKA_BOOTSTRAP_SERVERS"] = args.kafka_bootstrap_servers
CONFIG["MAX_NUM_THREADS"] = args.max_num_threads
CONFIG["LOCK_CONSUMER_OPERATIONS"] = args.lock_consumer_operations
CONFIG["POLL_TIMEOUT"] = args.poll_timeout
CONFIG["CHECK_ASSIGNMENTS_TIMEOUT"] = args.check_assignments_timeout
CONFIG["CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT"] = (
args.check_messages_are_consumed_timeout
)
with ExitStack() as stack:
if CONFIG["KAFKA_BOOTSTRAP_SERVERS"] is None:
import testcontainers.kafka
kafka = stack.enter_context(testcontainers.kafka.KafkaContainer())
CONFIG["KAFKA_BOOTSTRAP_SERVERS"] = kafka.get_bootstrap_server()
print(f"NUM_TOPICS: {CONFIG['NUM_TOPICS']}")
print(f"NUM_MESSAGES: {CONFIG['NUM_MESSAGES']}")
print(f"NUM_EXPERIMENTS: {CONFIG['NUM_EXPERIMENTS']}")
print(f"TOPIC_PREFIX: {CONFIG['TOPIC_PREFIX']}")
print(f"KAFKA_BOOTSTRAP_SERVERS: {CONFIG['KAFKA_BOOTSTRAP_SERVERS']}")
print(f"MAX_NUM_THREADS: {CONFIG['MAX_NUM_THREADS']}")
print(f"LOCK_CONSUMER_OPERATIONS: {CONFIG['LOCK_CONSUMER_OPERATIONS']}")
print(f"POLL_TIMEOUT: {CONFIG['POLL_TIMEOUT']}")
print(f"CHECK_ASSIGNMENTS_TIMEOUT: {CONFIG['CHECK_ASSIGNMENTS_TIMEOUT']}")
print(
f"CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT: {CONFIG['CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT']}"
)
# This breaks wait_for_assignments() because the first call to poll() causes
# a lock issue
if CONFIG["LOCK_CONSUMER_OPERATIONS"] and CONFIG["POLL_TIMEOUT"] in (None, -1):
print(
"Warning, blocking poll() call will prevent another thread from acquiring the lock."
)
# For whatever reason, consumers don't get assigned partitions if this is
# set to a value other than None. Even passing in None to poll() causes issues
if CONFIG["POLL_TIMEOUT"] not in (None, -1):
print(
"Warning, setting POLL_TIMEOUT typically prevents assignments from ever happening."
)
SINGLETONS["limiter"] = anyio.CapacityLimiter(CONFIG["MAX_NUM_THREADS"])
asyncio.run(run_test_application(), debug=True)
Assignments never happening when poll(1.0)
https://gist.github.com/andreaimprovised/6221cba7c0be98ee3189dd517998bda3
INTERNAL ERROR with 3 topics
https://gist.github.com/andreaimprovised/d80eedeea6ef7beb44fff228df1942da
segmentation fault with 12 topics
https://gist.github.com/andreaimprovised/5bc6acdc05fecb35d7cb7f20295c31f7
Segmentation fault with just 1 topic and 1 message per test case
https://gist.github.com/andreaimprovised/1fe7b9f8be0d34d8a6dc40827c802934
Additional requirements:
testcontainers==4.7.2
anyio==4.4.0
I'm currently using python 3.10.
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()):
In [2]: confluent_kafka.version() Out[2]: ('2.5.0', 33882112)
- [x] Apache Kafka broker version:
confluentinc/cp-kafka:7.6.0
- [x] Client configuration:
{...}
It's in the code.
- [x] Operating system:
I've seen this on darwin arm64 and linux x86_64.
- [x] Provide client logs (with
'debug': '..'as necessary)
Here is an example
- [ ] Provide broker log excerpts
Hmmm, I'll try to figure out how to get these.
- [ ] Critical issue
It's not critical, but it depends on how critical you think automated test suites are.
I've seen similar in our testing setup.
We use async subscribers from FastStream but the end result seems to be the same as it uses confluent under the hood.
At first it appeared to be related to our pants+nix test setup on python 3.11 but I have reproduced it running pytest within a venv on 3.12 as well.
I also run our tests within a docker compose application, and have never seen it occur there.
The main difference between the environments is between linux_aarch64 and macos_amd64 I suppose but I have also noticed the async selector is different:
KqueueSelectorlocallyEpollSelectorwithin the containers
A coworker not on macOS has reported the issue as well and it's also appeared in CI (GH Actions) so seemingly not macOS specific.
(venv) ~/org/components/gsf git:GSF-654-convert-threaded* -> python -m pytest
===================================================== test session starts =====================================================
platform darwin -- Python 3.12.5, pytest-7.4.4, pluggy-1.5.0
rootdir: /Users/ntr/org/components/gsf
configfile: pyproject.toml
testpaths: tests
plugins: asyncio-0.23.8, env-1.1.3, cov-4.1.0, anyio-3.7.1, docker-3.1.1
asyncio: mode=Mode.AUTO
collecting ... Variable Name: ENABLE_METRICS, Value: true
Variable Name: ENABLE_LOADTESTING, Value: None
Variable Name: SEND_NOTIFICATIONS, Value: true
Variable Name: ENABLE_SENTRY, Value: None
Variable Name: ENABLE_SENTRY_TRACING, Value: None
Variable Name: IEEE_LOGGING_CLOUDWATCH, Value: None
collected 108 items
tests/gsf_tests/test_adaptors.py .... [ 3%]
tests/gsf_tests/test_config_manager.py .. [ 5%]
tests/gsf_tests/test_control_responses.py ..Fatal Python error: Segmentation fault
Current thread 0x000000016ea73000 (most recent call first):
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 807 in run
File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1075 in _bootstrap_inner
File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1032 in _bootstrap
Thread 0x00000001f57a8f40 (most recent call first):
File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
File "<frozen importlib._bootstrap_external>", line 1289 in create_module
File "<frozen importlib._bootstrap>", line 813 in module_from_spec
File "<frozen importlib._bootstrap>", line 921 in _load_unlocked
File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1415 in _handle_fromlist
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/multidict/_compat.py", line 12 in <module>
File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
File "<frozen importlib._bootstrap_external>", line 995 in exec_module
File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/multidict/__init__.py", line 9 in <module>
File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
File "<frozen importlib._bootstrap_external>", line 995 in exec_module
File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/aiohttp/hdrs.py", line 7 in <module>
File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
File "<frozen importlib._bootstrap_external>", line 995 in exec_module
File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1415 in _handle_fromlist
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/aiohttp/__init__.py", line 5 in <module>
File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
File "<frozen importlib._bootstrap_external>", line 995 in exec_module
File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
File "/Users/ntr/org/components/gsf/src/gsf/data_plane/entrypoints/upstream.py", line 11 in <module>
File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
File "<frozen importlib._bootstrap_external>", line 995 in exec_module
File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
File "/Users/ntr/org/components/gsf/src/gsf/data_plane/entrypoints/control_responses_subscriber.py", line 8 in <module>
File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
File "<frozen importlib._bootstrap_external>", line 995 in exec_module
File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
File "/Users/ntr/org/components/gsf/tests/gsf_tests/test_control_responses.py", line 86 in test_control_responses_subscriber
File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/events.py", line 88 in _run
File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 1986 in _run_once
File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 641 in run_forever
File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 674 in run_until_complete
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py", line 906 in inner
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/python.py", line 194 in pytest_pyfunc_call
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103 in _multicall
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120 in _hookexec
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513 in __call__
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/python.py", line 1792 in runtest
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py", line 440 in runtest
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 169 in pytest_runtest_call
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103 in _multicall
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120 in _hookexec
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513 in __call__
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 262 in <lambda>
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 341 in from_call
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 261 in call_runtest_hook
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 222 in call_and_report
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 133 in runtestprotocol
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 114 in pytest_runtest_protocol
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103 in _multicall
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120 in _hookexec
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513 in __call__
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/main.py", line 350 in pytest_runtestloop
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103 in _multicall
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120 in _hookexec
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513 in __call__
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/main.py", line 325 in _main
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/main.py", line 271 in wrap_session
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/main.py", line 318 in pytest_cmdline_main
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103 in _multicall
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120 in _hookexec
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513 in __call__
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 169 in main
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 192 in console_main
File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pytest/__main__.py", line 5 in <module>
File "<frozen runpy>", line 88 in _run_code
File "<frozen runpy>", line 198 in _run_module_as_main
Extension modules: lxml._elementpath, lxml.etree, _cffi_backend, confluent_kafka.cimpl, charset_normalizer.md (total: 5)
[1] 14261 segmentation fault python -m pytest
the test is basically this:
message_received_event = Event()
async def subscriber(event: CloudEvent):
assert event.data
if mrid in event.data.get("request_body"):
message_received_event.set()
broker.subscriber(CONTROL_RESPONSES_TOPIC["name"], group_id="test_event_router")(subscriber)
# Start listening
await broker.start()
payload = get_control_response_payload(status=1, subject=mrid)
res = await async_der_client.post(url=url, content=payload)
assert res.status_code == 201
# Wait for the event to be set with a timeout to avoid hanging indefinitely
try:
await asyncio.wait_for(message_received_event.wait(), timeout=5.0)
except asyncio.TimeoutError:
log.error("Timed out waiting for message received event")
assert message_received_event.is_set()
await broker.close()
Reproduced it with lldb, looks pretty cut and dry:
tests/gsf_tests/test_config_manager.py .. [ 5%]
Process 23727 stopped
* thread #9, stop reason = EXC_BAD_ACCESS (code=1, address=0xae8)
frame #0: 0x0000000105d9b5b8 librdkafka.dylib`rd_kafka_q_pop_serve + 888
librdkafka.dylib`rd_kafka_q_pop_serve:
-> 0x105d9b5b8 <+888>: ldr w8, [x19, #0xae8]
0x105d9b5bc <+892>: cmp w8, #0x1
0x105d9b5c0 <+896>: b.ne 0x105d9b608 ; <+968>
0x105d9b5c4 <+900>: add x0, sp, #0x10
Target 0: (Python) stopped.
(lldb) bt
* thread #9, stop reason = EXC_BAD_ACCESS (code=1, address=0xae8)
* frame #0: 0x0000000105d9b5b8 librdkafka.dylib`rd_kafka_q_pop_serve + 888
frame #1: 0x0000000105d70488 librdkafka.dylib`rd_kafka_consume0 + 236
frame #2: 0x0000000102959cb8 cimpl.cpython-312-darwin.so`Consumer_poll + 132
frame #3: 0x0000000100ac26d0 Python`cfunction_call + 72
frame #4: 0x0000000100a70c94 Python`_PyObject_MakeTpCall + 128
frame #5: 0x0000000100b849f4 Python`context_run + 104
frame #6: 0x0000000100ac1db8 Python`cfunction_vectorcall_FASTCALL_KEYWORDS + 92
frame #7: 0x0000000100b66bf8 Python`_PyEval_EvalFrameDefault + 50272
frame #8: 0x0000000100a73d5c Python`method_vectorcall + 372
frame #9: 0x0000000100c36720 Python`thread_run + 144
frame #10: 0x0000000100bcc850 Python`pythread_wrapper + 48
frame #11: 0x000000018d959f94 libsystem_pthread.dylib`_pthread_start + 136
```
our experience is that if you call close on the consumer while it's polling then segfaults or hangs occur. We've added some locking around the poll/consume and close calls so they can't be called together which seems to have resolved the problem.
Thanks for reporting this with lots of details. Apologies no one responded earlier. There's some related segfaults around poll and closing: https://github.com/confluentinc/confluent-kafka-python/issues/457 https://github.com/confluentinc/confluent-kafka-python/issues/1761 I'm marking this higher priority to get some attention to consistent reproduction and tracking down the thread safety error.