testcontainers-python icon indicating copy to clipboard operation
testcontainers-python copied to clipboard

Bug: Segmentation fault when using with confluent kafka consumer

Open trajano opened this issue 8 months ago • 1 comments

Describe the bug

Creating an asyncio test case that uses the kafka container on completion of the test (the test itself is successful) the pytest environment segfaults.

To Reproduce

Provide a self-contained code snippet that illustrates the bug or unexpected behavior. Ideally, send a Pull Request to illustrate with a test that illustrates the problem.

import asyncio
import time
import sys
from confluent_kafka import Consumer
import pytest
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka import KafkaContainer

@pytest.fixture(scope="module")
def kafka_container():
    """Provide a temporary Redis container."""
    with KafkaContainer("mirror.gcr.io/confluentinc/cp-kafka:7.6.0") as container:
        wait_for_logs(container, "Awaiting socket connections on 0.0.0.0:9093")
        for topic in ["mytopic"]:
            container.exec(
                f"kafka-topics "
                f"--bootstrap-server localhost:9092 "
                f"--create "
                f"--topic {topic} "
                f"--partitions 3 "
                f"--replication-factor 1",
            )
        yield container

async def kafka_consumer(consumer, timeout=1.0):

    while True:
        msg  = await asyncio.to_thread(consumer.poll,  timeout=timeout)

        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        print('Received message: {}'.format(msg.value().decode('utf-8')))

async def main(consumer):

    task = asyncio.create_task(kafka_consumer(consumer))

    await asyncio.sleep(10)
    print("Cancelling the task...")
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

@pytest.mark.asyncio(loop_scope="function")
async def test_sample(kafka_container):
    consumer = Consumer({
        'bootstrap.servers': kafka_container.get_bootstrap_server(),
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest',
        "debug":"all"})
    consumer.subscribe(['mytopic'])


    await main(consumer)
    consumer.close() 

Runtime environment

Provide a summary of your runtime environment. Which operating system, python version, and docker version are you using? What is the version of testcontainers-python you are using? You can run the following commands to get the relevant information.

[project]
name = "kafka-test"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "confluent-kafka>=2.9.0",
]

[dependency-groups]
dev = [
    "pytest>=8.3.5",
    "pytest-asyncio>=0.26.0",
    "testcontainers>=4.10.0",
]

More info

https://github.com/confluentinc/confluent-kafka-python/issues/1953#issuecomment-2784234429

trajano avatar Apr 07 '25 19:04 trajano

Without the asyncio cancellation it works fine

import asyncio
import time
import sys
from confluent_kafka import Consumer
import pytest
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka import KafkaContainer

@pytest.fixture(scope="module")
def kafka_container():
    """Provide a temporary Redis container."""
    with KafkaContainer("mirror.gcr.io/confluentinc/cp-kafka:7.6.0") as container:
        wait_for_logs(container, "Awaiting socket connections on 0.0.0.0:9093")
        for topic in ["mytopic"]:
            container.exec(
                f"kafka-topics "
                f"--bootstrap-server localhost:9092 "
                f"--create "
                f"--topic {topic} "
                f"--partitions 3 "
                f"--replication-factor 1",
            )
        yield container

async def main(consumer):
    await asyncio.to_thread(consumer.poll,  timeout=10)

@pytest.mark.asyncio(loop_scope="function")
async def test_sample(kafka_container):
    consumer = Consumer({
        'bootstrap.servers': kafka_container.get_bootstrap_server(),
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest',
        "debug":"all"})
    consumer.subscribe(['mytopic'])


    await main(consumer)
    consumer.close()

trajano avatar Apr 07 '25 19:04 trajano