kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Client spams with warnings "Unable to send to wakeup socket" endlessly

Open Prometheus3375 opened this issue 5 years ago • 15 comments
trafficstars

kafka-python version: 2.0.1 python version: 3.7.7

This issue is a duplicate of #1842, that issue is closed and I cannot reopen it. Thus I opened new one.

When I close a spider, kafka starts to spam with "Unable to send to wakeup socket" warning and does not want to stop. Watch attached spider.log file.

I went to sources and add one line which "fix" this issue. Here is original code.

# Source: kafka/client_async.py
# Class: KafkaClient

    def wakeup(self):
        with self._wake_lock:
            try:
                self._wake_w.sendall(b'x')
            except socket.timeout:
                log.warning('Timeout to send to wakeup socket!')
                raise Errors.KafkaTimeoutError()
            except socket.error:
                log.warning('Unable to send to wakeup socket!')

This is fixed.

    def wakeup(self):
        with self._wake_lock:
            try:
                self._wake_w.sendall(b'x')
            except socket.timeout:
                log.warning('Timeout to send to wakeup socket!')
                raise Errors.KafkaTimeoutError()
            except socket.error as e:
                log.warning('Unable to send to wakeup socket!')
                raise e

I do not know what causes the problem and why raising exception stops spam.

Prometheus3375 avatar Jul 06 '20 20:07 Prometheus3375

I am seeing the same behavior with nameko_kafka which uses this lib internally.

joaomcarlos avatar Jul 15 '20 15:07 joaomcarlos

Having the same issue with the same lib version and python version in a docker container using python:3.7-slim-stretch

Miggets7 avatar Jul 28 '20 13:07 Miggets7

i have the same behavior in 2.0.1

peilingkuang avatar Aug 17 '20 06:08 peilingkuang

Having the same issue with the same kafka-python version of 2.0.1

tuofang avatar Aug 17 '20 10:08 tuofang

Having the same issue also with kafka-python 2.0.1. Don't know what causes it. After reboot it stops for a while but seems to get triggered when receiving data.

lennstof avatar Sep 07 '20 12:09 lennstof

Is there any progress on this issue ? I am also hitting this issue. I am using 1.4.7 where #1842 says it is fixed.

From @dpkp "I believe this is fixed in the latest release, 1.4.7. Please reopen if the issue persists."

@Prometheus3375 proposed a fix above. Can it be reviewed for picking up?

ninakka avatar Feb 04 '21 18:02 ninakka

This problem occasionally occurs when closing a Kafka link, Is there a mistake?

FANGOD avatar Mar 16 '21 03:03 FANGOD

Confirm. Issue persists when trying to close the consumer connection with Kafka inside docker container.

forgoty avatar May 07 '21 16:05 forgoty

Same here--this happens as soon as I try to gracefully shut down a consumer. As it stands, I have to forcefully kill the process to get it to stop.

mhworth avatar May 09 '21 18:05 mhworth

Same

Same here--this happens as soon as I try to gracefully shut down a consumer. As it stands, I have to forcefully kill the process to get it to stop.

+1

vlaskinvlad avatar Jun 14 '21 14:06 vlaskinvlad

I found another way to solve this problem without modifying source code of Kafka-python.

    def run(self) -> Iterator[Event]:
        self._consumer = KafkaConsumer(
            self._kafka_topic,
            bootstrap_servers=self._kafka_server,
            value_deserializer=self.data_deserializer,
            consumer_timeout_ms=16000)

        while not self._stop_receiver:
            try:
                for msg in self._consumer:
                    #  pdb.set_trace()
                    if isinstance(msg.value, Event):
                        yield msg.value
            except StopIteration:
                continue
    
    def close(self):
        if self._consumer is not None:
            #  self._consumer.unsubscribe()
            self._stop_receiver = True
            time.sleep(1)
            self._consumer.unsubscribe()
            self._consumer.close()

NitroCao avatar Nov 08 '21 10:11 NitroCao

I found another way to solve this problem without modifying source code of Kafka-python.

    def run(self) -> Iterator[Event]:
        self._consumer = KafkaConsumer(
            self._kafka_topic,
            bootstrap_servers=self._kafka_server,
            value_deserializer=self.data_deserializer,
            consumer_timeout_ms=16000)

        while not self._stop_receiver:
            try:
                for msg in self._consumer:
                    #  pdb.set_trace()
                    if isinstance(msg.value, Event):
                        yield msg.value
            except StopIteration:
                continue
    
    def close(self):
        if self._consumer is not None:
            #  self._consumer.unsubscribe()
            self._stop_receiver = True
            time.sleep(1)
            self._consumer.unsubscribe()
            self._consumer.close()

It works, seems the consumer_timeout_ms is the key point.

glorinli avatar Jul 19 '23 06:07 glorinli

I consistently ran into this when accidentally using a closed consumer, underlying socket exception was "Bad file descriptor", should someone run into the same issue. Error was on my part, but some sanity check with a nicer error message for this could have saved me an hour or so of debugging.

aiven-anton avatar Sep 28 '23 18:09 aiven-anton

@aiven-anton What was the fix for you, I am getting the same "Bad file descriptor"

adityaraj-28 avatar Oct 10 '23 06:10 adityaraj-28

@adityaraj-28

We have a custom context manager for setting up and closing a consumer, I was using the consumer after it had been closed. Fix is simply not to do that. Code was more complex than the example, but simplified, it boils down to this:

with create_consumer() as consumer:
    # Use consumer here.
    ...

# If you use it here, outside the managed context, it's closed
# and you get a bad file descriptor error.

aiven-anton avatar Oct 10 '23 07:10 aiven-anton