confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Segmentation fault happens when trying to commit a message after the max.poll.interval.ms passed without polling

Open abdallahashraf22 opened this issue 1 year ago • 1 comments

Description

when trying to commit a message after not polling for anything more than the max.poll.interval.ms, I'm getting a non-recoverable segmentation fault that I can't handle in an exception handler, causing the python interpreter to exit

How to reproduce

consumer = Consumer({
                "bootstrap.servers": cls._bootstrap_servers,
                "group.id": group_id,
                "enable.auto.commit": False,
                "auto.offset.reset": "earliest"
            })
consumer.subscribe([topic_name])
while True:
       logger.info("polling...")
       message = consumer.poll(timeout=10.0)
       if message is None:
            continue
      consumer.pause(consumer.assignment())
      logger.info("consumer has been paused")
      try:
          output = message.value()
          logger.info(f"received message from kafka {output}")
          output = json.loads(output)
          logger.info("-------Consumed Message------")
          logger.info(output)
          processing_output_function(output)    # this processing should take more than the max.poll.interval.ms, which would be 5 minutes by default
     except TypeError as e:
          logger.error(f"json type error: {e}")
     except json.decoder.JSONDecodeError as e:
          logger.error(f"json decode error: {e}")
     except Exception as e:
          logger.error(f"JAIS General Exception: {e}")
     finally:
          logger.info("committing message")
          consumer.commit(message)
          logger.info("message has been committed")
          consumer.resume(consumer.assignment())
          logger.info("consumer has been resumed")

in the section above, if processing_output_function(output) takes more than the max.poll.interval.ms, that particular loop with end correctly, commit the message, and then on the next one, I consume a message that says

Application maximum poll interval (300000ms) exceeded by 231ms

, decoding this fails with " json.decoder.JSONDecodeError" exception, when going to finally commit the message, the logs says "commiting message" segmentation fault

and I don't reach the part about "message has been commited"

I'm not sure if this is a working as intended situation, but it seems weird that it will stop my pthon execution and with no way to handle an exit exception for now I increased the polling max time but can fetch logs later on if requested

Checklist

Please provide the following information:

  • [x] confluent-kafka-python: ('2.3.0', 33751040)
  • [x] librdkafka version: ('2.3.0', 33751295)
  • [x] Apache Kafka broker version: 2.13-2.8.1
  • [x] Client configuration: {...}: "enable.auto.commit": False, "auto.offset.reset": "earliest"
  • [x] Operating system: linux
  • [x] Critical issue: yes I think

abdallahashraf22 avatar Feb 06 '24 13:02 abdallahashraf22

confirmed the problem not from anything regarding the processing output function, as reproduced the problem with time.sleep

config = {
        "bootstrap.servers": "some host",
        "group.id": "some group id",
        "enable.auto.commit": False,
        "auto.offset.reset": "earliest",
        "max.poll.interval.ms": 60000,
    }
consumer = Consumer(config)
consumer.subscribe(["some_topic"])
while True:
   logger.info("polling...")
   message = consumer.poll(timeout=10.0)
   if message is None:
       continue
   consumer.pause(consumer.assignment())
   logger.info("consumer has been paused")
   try:
       output = message.value()
       logger.info(f"received message from kafka {output}")
       output = json.loads(output)
       if isinstance(output, dict) and output.get("task") == "sleep":
            time.sleep(65)
       logger.info("-------Consumed Message------")
       logger.info(output)
   except TypeError as e:
        logger.error(f"json type error: {e}")
        except json.decoder.JSONDecodeError as e:
        logger.error(f"json decode error: {e}")
        except Exception as e:
        logger.error(f"General Exception: {e}")
   finally:
        logger.info("committing message")
        consumer.commit(message)
        logger.info("message has been committed")
        consumer.resume(consumer.assignment())
        logger.info("consumer has been resumed")

logs

INFO:     __main__|polling...
INFO:     __main__|polling...
INFO:     __main__|polling...
INFO:     __main__|consumer has been paused
INFO:     __main__|received message from kafka b'{'id': 'testing 2', 'task': 'sleep', 'text': 'something'}'
INFO:     __main__|committing message
INFO:     __main__|message has been committed
INFO:     __main__|received message from kafka b'Application maximum poll interval (60000ms) exceeded by 500ms'
ERROR:    __main__|json decode error: Expecting value: line 1 column 1 (char 0)
INFO:     __main__|:committing message
Segmentation fault (core dumped)

abdallahashraf22 avatar Feb 08 '24 10:02 abdallahashraf22

I am able to reproduce this issue.

Output of the message consumer.poll(timeout=10.0) can be a valid message or can contain an error. In your case, it contains an error and you are trying to commit that error message. Error messages doesn't have valid topic name or offset and hence it is giving SegFault when trying to commit.

Check valid usage in Consumer example

    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

pranavrth avatar May 27 '24 14:05 pranavrth

I think we shouldn't through SegFault in this case even though this is not the correct usage.

pranavrth avatar May 27 '24 18:05 pranavrth

ah I see I had wrong expectations of the error behaviour, sorry

and yeah, I agree it still shouldn't go through the python level and throw a segmentation fault, should be able to except that exception

thanks for the fix, once this pr gets merged feel free to close the issue

abdallahashraf22 avatar Jun 24 '24 08:06 abdallahashraf22