Transport endpoint is not connected
` During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 241, in _flush_queued_events self._send_events(events) File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 304, in _send_events self._transport.send(events, use_logging=use_logging) File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 103, in send self._close() File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 138, in _close self._sock.shutdown(socket.SHUT_WR) OSError: [Errno 107] Transport endpoint is not connected [2024-06-06 18:55:20,746] [ERROR] [LogProcessingWorker._safe_log_impl] An error occurred while sending events: [Errno 107] Transport endpoint is not connected Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 138, in _fetch_events self._fetch_event() File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 160, in _fetch_event self._event = self._queue.get(block=False) File "/usr/lib/python3.10/queue.py", line 168, in get raise Empty _queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 241, in _flush_queued_events self._send_events(events) File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 304, in _send_events self._transport.send(events, use_logging=use_logging) File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 103, in send self._close() File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 138, in _close self._sock.shutdown(socket.SHUT_WR) OSError: [Errno 107] Transport endpoint is not connected [2024-06-06 18:55:22,748] [ERROR] [LogProcessingWorker._safe_log_impl] An error occurred while sending events: [Errno 107] Transport endpoint is not connected Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 138, in _fetch_events self._fetch_event() File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 160, in _fetch_event self._event = self._queue.get(block=False) File "/usr/lib/python3.10/queue.py", line 168, in get raise Empty _queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 241, in _flush_queued_events self._send_events(events) File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 304, in _send_events self._transport.send(events, use_logging=use_logging) File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 103, in send self._close() File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 138, in _close self._sock.shutdown(socket.SHUT_WR) OSError: [Errno 107] Transport endpoint is not connected [2024-06-06 18:55:24,750] [ERROR] [LogProcessingWorker._safe_log_impl] An error occurred while sending events: [Errno 107] Transport endpoint is not connected Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 138, in _fetch_events self._fetch_event() File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 160, in _fetch_event self._event = self._queue.get(block=False) File "/usr/lib/python3.10/queue.py", line 168, in get raise Empty _queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 241, in _flush_queued_events self._send_events(events) File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 304, in _send_events self._transport.send(events, use_logging=use_logging) File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 103, in send self._close() File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 138, in _close self._sock.shutdown(socket.SHUT_WR) OSError: [Errno 107] Transport endpoint is not connected [2024-06-06 18:55:26,752] [ERROR] [LogProcessingWorker._safe_log_impl] An error occurred while sending events: [Errno 107] Transport endpoint is not connected Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 138, in _fetch_events self._fetch_event() File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 160, in _fetch_event self._event = self._queue.get(block=False) File "/usr/lib/python3.10/queue.py", line 168, in get raise Empty _queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 241, in _flush_queued_events self._send_events(events) File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 304, in _send_events self._transport.send(events, use_logging=use_logging) File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 103, in send self._close() File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 138, in _close self._sock.shutdown(socket.SHUT_WR) OSError: [Errno 107] Transport endpoint is not connected [2024-06-06 18:55:28,754] [ERROR] [LogProcessingWorker._safe_log_impl] An error occurred while sending events: [Errno 107] Transport endpoint is not connected Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 138, in _fetch_events self._fetch_event() File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 160, in _fetch_event self._event = self._queue.get(block=False) File "/usr/lib/python3.10/queue.py", line 168, in get raise Empty _queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 241, in _flush_queued_events self._send_events(events) File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 304, in _send_events self._transport.send(events, use_logging=use_logging) File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 103, in send self._close() File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 138, in _close self._sock.shutdown(socket.SHUT_WR) OSError: [Errno 107] Transport endpoint is not connected [2024-06-06 18:55:30,756] [ERROR] [LogProcessingWorker._safe_log_impl] An error occurred while sending events: [Errno 107] Transport endpoint is not connected Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 138, in _fetch_events self._fetch_event() File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 160, in _fetch_event self._event = self._queue.get(block=False) File "/usr/lib/python3.10/queue.py", line 168, in get raise Empty _queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 241, in _flush_queued_events self._send_events(events) File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 304, in _send_events self._transport.send(events, use_logging=use_logging) File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 103, in send self._close() File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 138, in _close self._sock.shutdown(socket.SHUT_WR) OSError: [Errno 107] Transport endpoint is not connected [2024-06-06 18:55:32,759] [ERROR] [LogProcessingWorker._safe_log_impl] An error occurred while sending events: [Errno 107] Transport endpoint is not connected Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 138, in _fetch_events self._fetch_event() File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 160, in _fetch_event self._event = self._queue.get(block=False) File "/usr/lib/python3.10/queue.py", line 168, in get raise Empty _queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 241, in _flush_queued_events self._send_events(events) File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 304, in _send_events self._transport.send(events, use_logging=use_logging) File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 103, in send self._close() File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 138, in _close self._sock.shutdown(socket.SHUT_WR) OSError: [Errno 107] Transport endpoint is not connected [2024-06-06 18:55:34,761] [ERROR] [LogProcessingWorker._safe_log_impl] An error occurred while sending events: [Errno 107] Transport endpoint is not connected Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 138, in _fetch_events self._fetch_event() File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 160, in _fetch_event self._event = self._queue.get(block=False) File "/usr/lib/python3.10/queue.py", line 168, in get raise Empty _queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 241, in _flush_queued_events self._send_events(events) File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 304, in _send_events self._transport.send(events, use_logging=use_logging) File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 103, in send self._close() File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 138, in _close self._sock.shutdown(socket.SHUT_WR) OSError: [Errno 107] Transport endpoint is not connected [2024-06-06 18:55:36,763] [ERROR] [LogProcessingWorker._safe_log_impl] An error occurred while sending events: [Errno 107] Transport endpoint is not connected Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 138, in _fetch_events self._fetch_event() File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 160, in _fetch_event self._event = self._queue.get(block=False) File "/usr/lib/python3.10/queue.py", line 168, in get raise Empty _queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 241, in _flush_queued_events self._send_events(events) File "/usr/local/lib/python3.10/dist-packages/logstash_async/worker.py", line 304, in _send_events self._transport.send(events, use_logging=use_logging) File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 103, in send self._close() File "/usr/local/lib/python3.10/dist-packages/logstash_async/transport.py", line 138, in _close self._sock.shutdown(socket.SHUT_WR) OSError: [Errno 107] Transport endpoint is not connected` Logstash has data and can write data normally, but this issue is frequently prompted. error 500/30s
This looks like https://github.com/eht16/python-logstash-async/issues/89 which should be fixed in 3.0.0. What version do you use?
I've got a question regarding this issue, because it happens to me as well, and I try to check if I understand it correctly:
In https://github.com/eht16/python-logstash-async/blob/master/logstash_async/worker.py#L226 there's a bit of code that handles exception while sending - if there's any exception, it will requeue the messages.
try:
events = [event['event_text'] for event in queued_events]
self._send_events(events)
# Log connection and network errors as warnings as they are rather harmless
except NETWORK_EXCEPTIONS as exc:
self._safe_log(
'warning',
'An error occurred while sending events: %s',
exc)
self._database.requeue_queued_events(queued_events)
break
except Exception as exc:
self._safe_log(
'exception',
'An error occurred while sending events: %s',
exc,
exc=exc)
self._database.requeue_queued_events(queued_events)
break
But in my case, and the example above, the exception is thrown on self._sock.shutdown(socket.SHUT_WR), which for UdpTransport happens here: https://github.com/eht16/python-logstash-async/blob/master/logstash_async/transport.py#L133
If i get it right, it means that whenever the socket can't be closed (because for example it's already gone OS-wise) events are requeued, despite the fact they were sucessfully sent, yes? That's what I seem to be observing, as I get a stream of duplicate events using UdpTransport, that can be only stopped with container restart and them being dropped from the in-memory queue.
@micurbanski @sibo-git Thanks for the analysis. Could you have a look at #100 if it makes things better? I'm not sure whether this is really a good idea, it will simply ignore the error. I see that requeueing already sent events is annoying but at this point, especially with UdpTransport, we probably can never know if the events were really sent or if they are still on the sending side in the socket queue or anywhere in between.
But maybe it's good enough.
Ok, it seems to help, meaning, the events are not requeued on socket close error, and we obeserve that we don't get duplicates anymore. So thanks for that!
Just one more thing, since it still bogs down our Sentry with that Error on closing the transport socket warning - can it be made configurable? I understand that it is important to throw warning by default, but we would like to be able to made conscious decision about not throwing that warning :)
@micurbanski I don't want to add yet another configuration option for this very special case. Instead I moved the logging call into a seperate method and so you can easily subclass the transport class and override the method.
I think it is still interesting what is causing those errors so frequent in the first place.
If you agree, I will merge the changes and release soon.
@eht16, certainly, that might be actually even better solution, greatly appreciated.
@micurbanski the changes just have been released as 4.0.0.
I'm closing this issue now as I think it's much better now. In case of further problems, just create a new issue.