python icon indicating copy to clipboard operation
python copied to clipboard

stream api will lose data when use sslsocket

Open Novelfor opened this issue 6 months ago • 2 comments

What happened (please include outputs or screenshots): I use stream api to implement "kubectl exec", when i test my code, sometimes it will lose data. After debuging, i think the reason is kubernete client handle non blocking sslsocket error.

According to the Python documentation on SSL Sockets (https://docs.python.org/3/library/ssl.html#ssl-nonblocking), "SSL socket may still have data available for reading without select() being aware of it".

Current code: https://github.com/kubernetes-client/python/blob/3e6cc5871997cd52fd8f4c8324894508f3a75136/kubernetes/base/stream/ws_client.py#L168

Can FIX it use sock.pending function

    def update(self, timeout=0):
        """Update channel buffers with at most one complete frame of input."""
        if not self.is_open():
            return
        if not self.sock.connected:
            self._connected = False
            return

        # The options here are:
        # select.select() - this will work on most OS, however, it has a
        #                   limitation of only able to read fd numbers up to 1024.
        #                   i.e. does not scale well. This was the original
        #                   implementation.
        # select.poll()   - this will work on most unix based OS, but not as
        #                   efficient as epoll. Will work for fd numbers above 1024.
        # select.epoll()  - newest and most efficient way of polling.
        #                   However, only works on linux.
        ssl_pending = 0
        if self.sock.is_ssl():
            ssl_pending = self.sock.sock.pending()

        if hasattr(select, "poll"):
            poll = select.poll()
            poll.register(self.sock.sock, select.POLLIN)
            if timeout is not None:
                timeout *= 1_000  # poll method uses milliseconds as the time unit
            r = poll.poll(timeout)
            poll.unregister(self.sock.sock)
        else:
            r, _, _ = select.select(
                (self.sock.sock, ), (), (), timeout)

        if r or ssl_pending > 0:

What you expected to happen:

How to reproduce it (as minimally and precisely as possible): Simple test to exec vim, it will lose cursor easily.. but should write a simple tty, here is my code. vim\htop can easily reproduce the data lose.

from kubernetes.stream import ws_client
import termios
import pty
import fcntl
import struct
import json
import signal
import tty
import select
import os
import threading
import yaml
import sys
import time
from datetime import datetime, timezone

def append_file(data):
    with open("test.txt", "a") as f:
        now = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
        f.write(f"{now} {data}\n")

class InteractiveShell:
    def __init__(self, client: ws_client.WSClient, has_stdin=True, has_tty=True, outfile=None, errfile=None):
        self.client = client
        self.has_stdin = has_stdin
        self.has_tty = has_tty
        self.master_fd = None
        self.keep_ping = threading.Thread(target=self._keep_ping, daemon=True)
        self.keep_ping.start()
        if errfile is None:
            self.errfile = pty.STDERR_FILENO
        else:
            self.errfile = errfile
        if outfile is None:
            self.outfile = pty.STDOUT_FILENO
        else:
            self.outfile = outfile

    def _keep_ping(self):
        while True:
            try:
                self.client.write_channel(6, "ping")
                time.sleep(60 * 10)
            except Exception as e:
                break

    def _set_pty_size(self, a=None, b=None):
        """
        Sets the window size of the child pty based on the window size of
               our own controlling terminal.
        """
        if not self.has_tty:
            return
        packed = fcntl.ioctl(pty.STDOUT_FILENO,
                             termios.TIOCGWINSZ,
                             struct.pack('HHHH', 0, 0, 0, 0))
        rows, cols, h_pixels, v_pixels = struct.unpack('HHHH', packed)
        self.client.write_channel(ws_client.RESIZE_CHANNEL, json.dumps({"Height": rows, "Width": cols}))

    def spawn(self, argv=None):
        if self.has_tty:
            old_handler = signal.signal(signal.SIGWINCH, self._set_pty_size)
            try:
                self.old_settings = tty.tcgetattr(pty.STDIN_FILENO)
                tty.setraw(pty.STDIN_FILENO)
            except tty.error:
                pass
        self._set_pty_size()
        ret_data = None
        returncode = -1
        try:
            ret_data = self.main_loop()
        finally:
            if self.has_tty:
                termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)
            if ret_data is None:
                err = self.client.read_channel(ws_client.ERROR_CHANNEL)
                ret_data = yaml.safe_load(err)
            if ret_data is None or ret_data['status'] == "Success":
                returncode = 0
            else:
                returncode = int(ret_data['details']['causes'][0]['message'])
        return returncode
    
    def forward_stdin_thread(self):
        while True:
            rfds, _, _ = select.select([pty.STDIN_FILENO], [], [], 1.0)
            if len(rfds) == 0:
                continue
            
            if pty.STDIN_FILENO in rfds and self.has_stdin:
                data = os.read(pty.STDIN_FILENO, 1024)
                append_file(f"STDIN: {data}")
                if data:
                    if data == b"0":
                        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)
                        from IPython import embed
                        embed()
                        op_code, rdata = self.client.sock.recv_data_frame(False)
                        append_file(f"0 received, op_code: {op_code}, data: {rdata}")
                    self.client.write_stdin(data)
                else:
                    break

    def main_loop(self):
        forward_thread = threading.Thread(target=self.forward_stdin_thread, daemon=True)
        forward_thread.start()
        while True:
            self.client.update(timeout=1.0)
            if self.client.peek_channel(ws_client.STDOUT_CHANNEL):
                data = self.client.read_channel(ws_client.STDOUT_CHANNEL)
                if data:
                    append_file(f"STDOUT: {data}\n")
                    self.write_stdout(data)
            elif self.client.peek_channel(ws_client.STDERR_CHANNEL):
                error_data = self.client.read_channel(ws_client.STDERR_CHANNEL)
                if error_data:
                    self.write_stderr(error_data)
            elif self.client.peek_channel(ws_client.RESIZE_CHANNEL):
                resize_data = self.client.read_channel(ws_client.RESIZE_CHANNEL)
                if resize_data:
                    resize_info = json.loads(resize_data)
                    rows = resize_info.get("Height", 24)
                    cols = resize_info.get("Width", 80)
            elif self.client.peek_channel(ws_client.ERROR_CHANNEL):
                error_data = self.client.read_channel(ws_client.ERROR_CHANNEL)
                error_msg = yaml.safe_load(error_data)
                return error_msg

    def write_stdout(self, data):
        os.write(self.outfile, data)

    def write_stderr(self, data):
        os.write(self.errfile, data)

    def forward_stdin(self, data):
        assert self.client is not None
        self.client.write_stdin(data)

if __name__ == "__main__":
    from kubernetes import client, config
    from kubernetes.stream import stream
    pod_name = "tlaunch-4fc21ba6-0"
    namespace = "ws-xingyuan"

    config.load_kube_config("/etc/kube.conf")
    core_api = client.CoreV1Api()

    client = stream(
        core_api.connect_get_namespaced_pod_exec,
        name=pod_name,
        namespace=namespace,
        command="zsh",
        stderr=True,
        stdin=True,
        stdout=True,
        tty=True,
        _preload_content=False,
        binary=True
    )

    shell = InteractiveShell(client, has_stdin=True, has_tty=True)
    return_code = shell.spawn()

I also write a test.py, but it's difficult to reproduce. Use exec to run test code, input any key, send time to host, sometimes (almost 30~50 times, depends on host cpu speed) it delay receving the data until next input.

from datetime import datetime, timezone
import sys

if __name__ == "__main__":
    try:
        while True:
            input()
            print(datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f") + "padding test", flush=True)
    except KeyboardInterrupt:
        sys.exit(254)

Anything else we need to know?:

Environment:

Novelfor avatar Jun 12 '25 08:06 Novelfor

Thanks for the detailed report @Novelfor! Would you like to send a PR to fix? /help

roycaihw avatar Jul 16 '25 20:07 roycaihw

@roycaihw: This request has been marked as needing help from a contributor.

Guidelines

Please ensure that the issue body includes answers to the following questions:

  • Why are we solving this issue?
  • To address this issue, are there any code changes? If there are code changes, what needs to be done in the code and what places can the assignee treat as reference points?
  • How can the assignee reach out to you for help?

For more details on the requirements of such an issue, please see here and ensure that they are met.

If this request no longer meets these requirements, the label can be removed by commenting with the /remove-help command.

In response to this:

Thanks for the detailed report @Novelfor! Would you like to send a PR to fix? /help

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

k8s-ci-robot avatar Jul 16 '25 20:07 k8s-ci-robot

The current v4 protocol of websocket has a known issue that can cause a similar effect https://github.com/kubernetes/kubernetes/issues/89899 , so we had to implement a new v5 protocol https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#proposal-new-remotecommand-sub-protocol-version---v5channelk8sio

Should we first try to implement this new v5 protocol?

aojea avatar Dec 14 '25 20:12 aojea