cpython icon indicating copy to clipboard operation
cpython copied to clipboard

asyncio socket write a multi-dimensional memoryview could truncate data

Open codingl2k1 opened this issue 6 months ago • 11 comments

Bug report

Bug description:

For a multi-dimensional memory view, if the socket sends only part of the memory view, then the view will be truncated to an empty slice. As a result, the write logic finishes by sending only part of the data.

class _SelectorSocketTransport(_SelectorTransport):

    def write(self, data):
        if not isinstance(data, (bytes, bytearray, memoryview)):
            raise TypeError(f'data argument must be a bytes-like object, '
                            f'not {type(data).__name__!r}')
        if self._eof:
            raise RuntimeError('Cannot call write() after write_eof()')
        if self._empty_waiter is not None:
            raise RuntimeError('unable to write; sendfile is in progress')
        if not data:
            return

        if self._conn_lost:
            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
                logger.warning('socket.send() raised exception.')
            self._conn_lost += 1
            return

        if not self._buffer:
            # Optimization: try to send now.
            try:
                print(f"send1 {self._sock} {data.nbytes if isinstance(data, memoryview) else len(data)}")
                if isinstance(data, memoryview):
                    print(data.shape, data.ndim, data.strides, data.suboffsets)
                n = self._sock.send(data)
                print(f"send2 {self._sock} {n} {len(bytes(data))}")
            except (BlockingIOError, InterruptedError):
                pass
            except (SystemExit, KeyboardInterrupt):
                raise
            except BaseException as exc:
                self._fatal_error(exc, 'Fatal write error on socket transport')
                return
            else:
                data = memoryview(data)[n:]
                print(f"send3 {bool(data)}")
                if not data:
                    return
            # Not all was written; register write handler.
            self._loop._add_writer(self._sock_fd, self._write_ready)

        # Add it to the buffer.
        self._buffer.append(data)
        self._maybe_pause_protocol()

Sample code to reproduce:

import asyncio
import tempfile
import numpy as np

q = asyncio.Queue()
arr = np.zeros((1, 300, 200), dtype=np.float16)
mv = memoryview(arr)
print(
    len(mv),
    mv.shape,
    mv.format,
    mv.ndim,
    mv.strides,
    mv.contiguous,
    mv.c_contiguous,
    mv.f_contiguous,
)


async def handle_echo(reader, writer):
    print(f"readexactly: {mv.nbytes}")
    data = await reader.readexactly(mv.nbytes)
    print(f"readexactly done: {len(data)}")
    writer.write(b"OK")
    await writer.drain()
    writer.close()
    await writer.wait_closed()


async def test_server():
    # Create a temporary file to get a unique path and then delete it
    with tempfile.NamedTemporaryFile(delete=True) as tmp:
        socket_path = tmp.name
    server = await asyncio.start_unix_server(handle_echo, path=socket_path)
    addr = socket_path
    await q.put(addr)
    print(f"Serving on {addr}")

    async with server:
        await server.serve_forever()


async def _test_send_multi_dimension_memoryview():
    svr = asyncio.create_task(test_server())
    try:
        addr = await q.get()

        reader, writer = await asyncio.open_unix_connection(addr)

        writer.write(mv)
        await writer.drain()
        ok = await reader.readexactly(2)
        print(ok)
    finally:
        svr.cancel()


asyncio.run(_test_send_multi_dimension_memoryview())

Output (hang):

1 (1, 300, 200) e 3 (120000, 400, 2) True True False
Serving on /var/folders/r6/h3hc6kj91s9czcyds6fh0yqh0000gn/T/tmp_9d7k3co
send1 <socket.socket fd=7, family=1, type=1, proto=0, raddr=/var/folders/r6/h3hc6kj91s9czcyds6fh0yqh0000gn/T/tmp_9d7k3co> 120000
(1, 300, 200) 3 (120000, 400, 2) ()
send2 <socket.socket fd=7, family=1, type=1, proto=0, raddr=/var/folders/r6/h3hc6kj91s9czcyds6fh0yqh0000gn/T/tmp_9d7k3co> 8192 120000
send3 False
readexactly: 120000

The test case should return without hang.

CPython versions tested on:

3.11 & 3.12 & main(bda121862e7d7f4684d9f0281f7d1f408c0c740c)

Operating systems tested on:

macOS

Linked PRs

  • gh-135974

codingl2k1 avatar Jun 23 '25 20:06 codingl2k1

This reproduces on HEAD. Note partial writes of memoryviews of non-byte arrays are also affected!

duaneg avatar Jun 26 '25 10:06 duaneg

In case it is useful to anyone, a reproducer using array.array (doesn't require numpy):

import array
import asyncio
import hashlib
import socket
import tempfile

done = asyncio.Event()
socket_path = tempfile.mktemp()

async def send(writer, mv):
    m = hashlib.sha256()
    m.update(mv)
    writer.write(mv)
    await writer.drain()
    print(f"w {len(mv) * mv.itemsize} {m.hexdigest()}")
    writer.close()

def receive(count):
    async def receiver(reader, writer):
        m = hashlib.sha256()
        read = 0
        while True:
            data = await reader.read(count - read)
            if data == b'':
                break

            m.update(data)
            read += len(data)
            if read >= count:
                break
        print(f"r {read} {m.hexdigest()}")
        writer.close()
        done.set()
    return receiver

async def main():
    arr = array.array('l', range(1024))
    mv = memoryview(arr)
    nbytes = len(mv) * mv.itemsize

    server = await asyncio.start_unix_server(receive(nbytes), path=socket_path)
    serving = asyncio.create_task(server.serve_forever())
    await asyncio.sleep(0)

    _, writer = await asyncio.open_unix_connection(path=socket_path)
    writer._transport._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
    await send(writer, mv)
    await done.wait()
    server.close()
    await server.wait_closed()

asyncio.run(main())

duaneg avatar Jun 26 '25 11:06 duaneg

In case it is useful to anyone, a reproducer using array.array (doesn't require numpy):

import array
import asyncio
import hashlib
import socket
import tempfile

done = asyncio.Event()
socket_path = tempfile.mktemp()

async def send(writer, mv):
    m = hashlib.sha256()
    m.update(mv)
    writer.write(mv)
    await writer.drain()
    print(f"w {len(mv) * mv.itemsize} {m.hexdigest()}")
    writer.close()

def receive(count):
    async def receiver(reader, writer):
        m = hashlib.sha256()
        read = 0
        while True:
            data = await reader.read(count - read)
            if data == b'':
                break

            m.update(data)
            read += len(data)
            if read >= count:
                break
        print(f"r {read} {m.hexdigest()}")
        writer.close()
        done.set()
    return receiver

async def main():
    arr = array.array('l', range(1024))
    mv = memoryview(arr)
    nbytes = len(mv) * mv.itemsize

    server = await asyncio.start_unix_server(receive(nbytes), path=socket_path)
    serving = asyncio.create_task(server.serve_forever())
    await asyncio.sleep(0)

    _, writer = await asyncio.open_unix_connection(path=socket_path)
    writer._transport._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
    await send(writer, mv)
    await done.wait()
    server.close()
    await server.wait_closed()

asyncio.run(main())

Thanks. Your fix PR works for me.

codingl2k1 avatar Jun 26 '25 11:06 codingl2k1

The code already checks for "'data argument must be a bytes-like object", I don't think multi dimensional mv arrays are supposed to be handled by impl, the user should ensure it is bytes like before passing. I think this should be fixed in docs rather than impl.

kumaraditya303 avatar Jun 28 '25 14:06 kumaraditya303

The code already checks for "'data argument must be a bytes-like object", I don't think multi dimensional mv arrays are supposed to be handled by impl, the user should ensure it is bytes like before passing. I think this should be fixed in docs rather than impl.

If the multi-dimensional memory view (mv) is contiguous, then asyncio can send multi-dimensional mv arrays without issues.

However, if asyncio does not support sending multi-dimensional mv arrays, we can check the number of dimensions (ndim) of the mv and raise an error for the user. Additionally, we should provide a code example in the documentation to handle multi-dimensional mv.

As a user, I wish that asyncio could support sending multi-dimensional mv without errors.

codingl2k1 avatar Jun 28 '25 15:06 codingl2k1

The code already checks for "'data argument must be a bytes-like object"

AIUI a memoryview is a byte-like object, but its elements are not necessarily bytes, and passing a byte count as an index when slicing relies on an unchecked and potentially incorrect assumption about the format and shape of the view. ISTM just updating the documentation is insufficient: if the write method has restrictions on what type of memoryview it can handle, those should be checked and enforced. As it is, we have the worst possible situation: it looks like this is supported and it all "works" normally, but will start failing and silently corrupting the binary data stream under certain rare conditions (e.g. when the system comes under heavy load).

If we do want to disallow such memoryviews, we could add a check like:

        if isinstance(data, memoryview):
            if data.itemsize != 1 or data.ndim != 1 or not data.c_contiguous:
                raise TypeError('memoryview buffer must be contiguous one-dimensional bytes')

However, this may break existing "working" code: writing "small" chunks of non-1d-byte data may in practice never trigger a partial write, and will appear to be working correctly, so someone out there (aside from OP) may well be relying on it. ISTM a little risky to potentially break such "working" code when we don't need to.

Having said that, I did not consider the non-contiguous-buffer case when writing the existing patch. It suffers the same problem of potentially (depending on how socket.send handles them) only failing on a partial write. Doing the cast at the top of the method would immediately and consistently catch that:

        if isinstance(data, memoryview) and (data.itemsize != 1 or data.ndim != 1):
                data = data.cast('c')

duaneg avatar Jun 29 '25 01:06 duaneg

ISTM just updating the documentation is insufficient: if the write method has restrictions on what type of memoryview it can handle, those should be checked and enforced. As it is, we have the worst possible situation: it looks like this is supported and it all "works" normally, but will start failing and silently corrupting the binary data stream under certain rare conditions (e.g. when the system comes under heavy load).

Adding more checks for edge cases has a cost for normal code which would never hit it. I have never seen anyone passing multi dimensional array to network related code, note that there is a similar writelines method which does no checking of arg and allows any buffer like objects, would you add checks to that too? You would have to do expensive instance checks on all objects which would slowdown code for no real benefit.

I think fixing the docs is the appropriate thing to do for now, I'm -1 for any code changes here.

kumaraditya303 avatar Jun 29 '25 06:06 kumaraditya303

ISTM just updating the documentation is insufficient: if the write method has restrictions on what type of memoryview it can handle, those should be checked and enforced. As it is, we have the worst possible situation: it looks like this is supported and it all "works" normally, but will start failing and silently corrupting the binary data stream under certain rare conditions (e.g. when the system comes under heavy load).

Adding more checks for edge cases has a cost for normal code which would never hit it. I have never seen anyone passing multi dimensional array to network related code, note that there is a similar writelines method which does no checking of arg and allows any buffer like objects, would you add checks to that too? You would have to do expensive instance checks on all objects which would slowdown code for no real benefit.

I think fixing the docs is the appropriate thing to do for now, I'm -1 for any code changes here.

It's true that checking an instance is expensive. However, I don't want my asyncio server to hang randomly. If checking the instance is the only way to handle multi-dimensional mv, then you are just leaving the burden to the users, making it difficult to use correctly.

codingl2k1 avatar Jun 29 '25 09:06 codingl2k1

I think fixing the docs is the appropriate thing to do for now, I'm -1 for any code changes here.

Very well. I strongly disagree, but such is life. I'll withdraw the PR.

duaneg avatar Jun 29 '25 10:06 duaneg

. If checking the instance is the only way to handle multi-dimensional mv, then you are just leaving the burden to the users, making it difficult to use correctly

The users here in this case are not numerous so it outweights the benefits. Adding a check also means leaving this burden to the other users.


Now, I'd like to mention that we don't support multi-dimensional views in hashlib even though we are talking about bytes-like objects supporting the buffer protocol, that is, we do the following:

if isinstance(data, memoryview):
    if data.itemsize != 1 or data.ndim != 1 or not data.c_contiguous:
        raise TypeError('memoryview buffer must be contiguous one-dimensional bytes')

However, we do those checks in C. For hashlib, we're already very fast, but maybe a C helper for checking the buffer for asyncio may be viable? I don't know if this could help though.

picnixz avatar Jun 29 '25 11:06 picnixz

. If checking the instance is the only way to handle multi-dimensional mv, then you are just leaving the burden to the users, making it difficult to use correctly

The users here in this case are not numerous so it outweights the benefits. Adding a check also means leaving this burden to the other users.


Now, I'd like to mention that we don't support multi-dimensional views in hashlib even though we are talking about bytes-like objects supporting the buffer protocol, that is, we do the following:

if isinstance(data, memoryview):
    if data.itemsize != 1 or data.ndim != 1 or not data.c_contiguous:
        raise TypeError('memoryview buffer must be contiguous one-dimensional bytes')

However, we do those checks in C. For hashlib, we're already very fast, but maybe a C helper for checking the buffer for asyncio may be viable? I don't know if this could help though.

It makes sense to check the input mv if the multi-dimensional mv is not supported. Just don't send truncated data without any notification.

codingl2k1 avatar Jun 29 '25 13:06 codingl2k1

I have clarified this requirement in docs with https://github.com/python/cpython/pull/137910

kumaraditya303 avatar Aug 18 '25 13:08 kumaraditya303