`io_uring_prep_send` returning zero in the CQE with polling
I'm witnessing an odd behavior whereby io_uring_prep_send SQEs get completed with res = 0, even if the connection is still alive and using send directly works just fine. This is on 6.10.3 and sending messages down a blocking TCP socket, with a polling uring.
I think this is due to MSG_DONTWAIT being passed to sock_sendmsg. MSG_DONTWAIT is set when IO_URING_F_NONBLOCK is set, which I think is the case for polling queues. So I'd imagine that when the socket buffer is full you'll just get 0.
Is this expected? I'd expect send on blocking sockets to block until space is present.
A few notes:
- Kernel 6.10 is not a maintained stable kernel. I don't think this is a kernel issue, mentioning it just in case as that version has long since been deprecated and is no longer maintained.
- When you say "polling queues", what exactly are you referring to?
Normally yes any send will either complete inline, or it'd wait for a POLLOUT trigger if the socket is full. Then the completion will come later as the socket drains.
Do you have some code that shows this issue? I feel like there are a lot of details missing above. This is often the case when you have been looking at your own code, but it's hard for an observer to make sense of it.
@axboe I do not have a small repro, this is part of a larger application. I didn't want to spend hours reducing the problem just in case my mental model of what should be happening was wrong. I'll try to get a small repro tomorrow.
@axboe oh and by "polling queues" I mean SQPOLL, i.e. I'm setting up io_uring like so:
void pollingUring(unsigned entries, uint32_t cpu, io_uring* ring) {
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
params.flags = IORING_SETUP_SINGLE_ISSUER|IORING_SETUP_SQPOLL|IORING_SETUP_SQ_AFF;
params.sq_thread_idle = ~(uint32_t)0;
params.sq_thread_cpu = cpu;
int ret = io_uring_queue_init_params(entries, ring, ¶ms);
if (ret < 0) {
throw EXPLICIT_SYSCALL_EXCEPTION(-ret, "io_uring_queue_init_params");
}
if (!(params.features & IORING_FEAT_FAST_POLL)) {
throw EXCEPTION("IORING_FEAT_FAST_POLL not available, we need it since we read/write from many sockets at once.");
}
if (!(params.features & IORING_FEAT_NODROP)) {
throw EXCEPTION("IORING_FEAT_NODROP not available, we need it since we might oversubscribe the completion ring.");
}
if (!(params.features & IORING_FEAT_SQPOLL_NONFIXED)) {
throw EXCEPTION("IORING_FEAT_SQPOLL_NONFIXED we need it since we're lazy.");
}
}
@axboe oh and by "polling queues" I mean SQPOLL, i.e. I'm setting up io_uring like so:
Ah gotcha. Polling is one of those terribly overloaded words, it can refer to a number of very different things.
@axboe I do not have a small repro, this is part of a larger application. I didn't want to spend hours reducing the problem just in case my mental model of what should be happening was wrong. I'll try to get a small repro tomorrow.
Thanks - in my experience this helps a lot as it shows me exactly what you are doing, in case it's a code issue. And it's often helpful on the reporter side too, as it forces you to consider cases you didn't before :-).
@axboe sure, I'll try to reduce to a small repro.
In the meantime, an alternative question might simply be: when can I expect send SQEs to have CQEs with ret=0 on blocking sockets? Note that this question is pretty hard to answer without io_uring too, but my expectation would be that it should never happen possibly barring cases where you get a shutdown while you're doing the send or something like that.
I'm verifying that it's io_send returning zero with bpftrace:
$ sudo bpftrace -e 'kretfunc:vmlinux:io_send { printf("pid=%d comm=%s opcode=%u res=%d ret=%d\n", pid, comm, args->req->opcode, args->req->cqe.res, retval); }'
<tons of activity>
pid=3036728 comm=iou-sqp-3036732 opcode=26 res=261932 ret=0
pid=3036728 comm=iou-sqp-3036732 opcode=26 res=229353 ret=0
pid=3036728 comm=iou-sqp-3036732 opcode=26 res=0 ret=0 # <--- unexpected zero
Also, it seems to get stuck in some doom loop where no matter how many times I resubmit I always get back zero.
I'm assuming 'res' here is 'cqe->res', but what is 'ret'?
Generally you expect the socket/networking part to return -EAGAIN if nothing can be sent right now, in which case io_uring would go through the motions of deferring until POLLOUT before trying and posting a completion when successful. So if you get 0, that would seem to imply that the networking side is returning 0. That's also consistent with the fact that any retries you do on the app side STILL gets a 0 return.
I'm assuming this reproduces identically without SQPOLL set for ring init?
@axboe ret is just the return value of io_send. Yes, I think it's just sock_sendmsg returning 0 because of MSG_DONTWAIT being set if SQPOLL, but then the question is whether this is desirable. In this case I'd imagine it's returning 0 because the skb is full and MSG_DONTWAIT makes it not wait for space.
I haven't tried without SQPOLL (again, not totally trivial to do). I'll do that and hopefully the repro tomorrow.
Ok, I'll wait for that before looking into this any further. Just to rule out a kernel issue, may not be a bad idea to use a supported stable kernel and double check that it reproduces there too.
OK, here's the repro:
#define _GNU_SOURCE
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <liburing.h>
#include <errno.h> // Required for errno
#define fail(fmt, ...) do { fprintf(stderr, fmt "\n" __VA_OPT__(,) __VA_ARGS__); exit(EXIT_FAILURE); } while (false)
static const size_t writeSize = 4ull<<30; // 4GiB
static const size_t readSize = 1<<10; // 1KiB
static const uint16_t listenPort = 6000;
static int listenFd;
static struct io_uring uring;
static struct sockaddr_in clientAddr;
static socklen_t clientLen = sizeof(clientAddr);
static const char* serverBuf;
static const uint8_t ACCEPT = 1;
static const uint8_t SEND = 2;
static void setupUring(unsigned entries, struct io_uring* ring) {
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
params.flags = IORING_SETUP_SINGLE_ISSUER|IORING_SETUP_DEFER_TASKRUN;
int ret = io_uring_queue_init_params(entries, ring, ¶ms);
if (ret < 0) {
fail("io_uring_queue_init_params %s", strerror(-ret));
}
}
static void setupServerSocket() {
struct sockaddr_in sa = {
.sin_family = AF_INET,
.sin_port = htons(listenPort),
};
if (inet_pton(AF_INET, "0", &sa.sin_addr) < 0) {
fail("can't get bind ip");
}
listenFd = socket(AF_INET, SOCK_STREAM, 0);
if (listenFd < 0) {
fail("socket %d", errno);
}
{
int yes = 1;
if (setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)) < 0) {
fail("setsockopt %d", errno);
}
}
if (bind(listenFd, (const struct sockaddr*)&sa, sizeof(sa)) < 0) {
fail("bind %d", errno);
}
if (listen(listenFd, 1) < 0) {
fail("listen %d", errno);
}
fprintf(stderr, "listening at %d\n", listenPort);
}
static struct io_uring_sqe* getSqe() {
struct io_uring_sqe* sqe = io_uring_get_sqe(&uring);
if (!sqe) {
io_uring_submit(&uring);
sqe = io_uring_get_sqe(&uring);
}
if (!sqe) {
fail("could not get sqe after submission");
}
return sqe;
}
static void addSend(int fd) {
struct io_uring_sqe* sqe = getSqe();
sqe->user_data = ((uint64_t)SEND << 56) | fd;
io_uring_prep_send(sqe, fd, serverBuf, writeSize, MSG_NOSIGNAL);
}
static void addAccept() {
struct io_uring_sqe* sqe = getSqe();
io_uring_prep_accept(sqe, listenFd, (struct sockaddr*)&clientAddr, &clientLen, 0);
sqe->user_data = (uint64_t)ACCEPT << 56;
}
static void processAccept(struct io_uring_cqe* cqe) {
if (cqe->res < 0) {
fail("accept %s", strerror(-cqe->res));
}
fprintf(stderr, "client connected at %d\n", ntohs(clientAddr.sin_port));
int fd = cqe->res;
addSend(fd);
addAccept();
}
static void processSend(int fd, struct io_uring_cqe* cqe) {
if (cqe->res == 0) {
fprintf(stderr, "got 0 in cqe->res, closing fd, flags=%016x\n", cqe->flags);
close(fd);
return;
}
if (cqe->res < 0) {
fprintf(stderr, "got error in cqe->res (%s), closing fd\n", strerror(-cqe->res));
close(fd);
return;
}
addSend(fd);
}
static void allocateServerBuf() {
int fd = open(".", O_TMPFILE|O_RDWR, 0600);
if (fd < 0) {
fail("open %s", strerror(errno));
}
if (ftruncate(fd, writeSize) < 0) {
fail("ftruncate %s", strerror(errno));
}
serverBuf = mmap(NULL, writeSize, PROT_READ, MAP_PRIVATE, fd, 0);
if (serverBuf == MAP_FAILED) {
fail("mmap %s", strerror(errno));
}
}
static void server() {
allocateServerBuf();
setupUring(16, &uring);
setupServerSocket();
addAccept();
for (;;) {
{
int ret = io_uring_submit_and_wait(&uring, 1);
if (ret < 0) {
fail("io_uring_submit_and_wait %d", ret);
}
}
struct io_uring_cqe* cqe;
unsigned head;
unsigned count = 0;
io_uring_for_each_cqe(&uring, head, cqe) {
count++;
uint8_t cmd = cqe->user_data >> 56;
uint64_t data = cqe->user_data & ~(0xFFull << 56);
switch (cmd) {
case ACCEPT:
processAccept(cqe);
break;
case SEND: {
int fd = data;
processSend(fd, cqe);
break; }
default:
fail("bad command %d", cmd);
}
}
io_uring_cq_advance(&uring, count);
}
}
static int connectToServer() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
fail("socket %s", strerror(fd));
}
struct sockaddr_in servAddr;
memset((char*)&servAddr, 0, sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_port = htons(listenPort);
if (inet_addr("127.0.0.1") == INADDR_NONE) {
fail("inet_addr");
}
servAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
if (connect(fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0) {
fail("connect %s", strerror(errno));
}
return fd;
}
static void client() {
int fd = connectToServer();
fprintf(stderr, "connected to server\n");
char* buf = (char*)malloc(readSize);
if (!buf) {
fail("could not allocate");
}
for (;;) {
ssize_t r = read(fd, buf, readSize);
if (r < 0) {
fail("couldn't read %s", strerror(errno));
}
}
}
static void usage(int argc, const char** argv) {
fprintf(stderr, "usage: %s server|client\n", argv[0]);
exit(2);
}
int main(int argc, const char** argv) {
if (argc != 2) {
usage(argc, argv);
}
if (strcmp("server", argv[1]) == 0) {
server();
} else if (strcmp("client", argv[1]) == 0) {
client();
} else {
usage(argc, argv);
}
}
I run it like this:
% clang -Wall -O2 -luring -Wall uring-zero-send.c -o uring-zero-send
% ./uring-zero-send server
% ./uring-zero-send client
This is on linux 6.15.4, liburing 2.9.
All this does is spawn a server which repeatedly issues a 4GiB send using io_uring_prep_send. The client just reads forever using read. This should keep going indefinitely.
However, if the buffer is a multiple of 4GiB (like it is in the test case), I get zero in cqe->res, i.e.:
% ./uring-zero-send server
listening at 6000
client connected at 55722
got 0 in cqe->res, closing fd, flags=0000000000000000
4GiB +- 1 is fine. So something about the buffer being a multiple of 4GiB throws io_uring off -- presumably it goes through an uint32_t somewhere. The SQPOLL was a red herring, the code above does not use it.
And just for completeness, here's a version which adds a non-uring server, just to verify that normal send works:
#define _GNU_SOURCE
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <liburing.h>
#include <errno.h> // Required for errno
#define fail(fmt, ...) do { fprintf(stderr, fmt "\n" __VA_OPT__(,) __VA_ARGS__); exit(EXIT_FAILURE); } while (false)
static const size_t writeSize = 4ull<<30; // 4GiB
static const size_t readSize = 1<<10; // 1KiB
static const uint16_t listenPort = 6000;
static int listenFd;
static struct io_uring uring;
static struct sockaddr_in clientAddr;
static socklen_t clientLen = sizeof(clientAddr);
static const char* serverBuf;
static const uint8_t ACCEPT = 1;
static const uint8_t SEND = 2;
static void setupUring(unsigned entries, struct io_uring* ring) {
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
params.flags = IORING_SETUP_SINGLE_ISSUER|IORING_SETUP_DEFER_TASKRUN;
int ret = io_uring_queue_init_params(entries, ring, ¶ms);
if (ret < 0) {
fail("io_uring_queue_init_params %s", strerror(-ret));
}
}
static void setupServerSocket() {
struct sockaddr_in sa = {
.sin_family = AF_INET,
.sin_port = htons(listenPort),
};
if (inet_pton(AF_INET, "0", &sa.sin_addr) < 0) {
fail("can't get bind ip");
}
listenFd = socket(AF_INET, SOCK_STREAM, 0);
if (listenFd < 0) {
fail("socket %d", errno);
}
{
int yes = 1;
if (setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)) < 0) {
fail("setsockopt %d", errno);
}
}
if (bind(listenFd, (const struct sockaddr*)&sa, sizeof(sa)) < 0) {
fail("bind %d", errno);
}
if (listen(listenFd, 1) < 0) {
fail("listen %d", errno);
}
fprintf(stderr, "listening at %d\n", listenPort);
}
static struct io_uring_sqe* getSqe() {
struct io_uring_sqe* sqe = io_uring_get_sqe(&uring);
if (!sqe) {
io_uring_submit(&uring);
sqe = io_uring_get_sqe(&uring);
}
if (!sqe) {
fail("could not get sqe after submission");
}
return sqe;
}
static void addSend(int fd) {
struct io_uring_sqe* sqe = getSqe();
sqe->user_data = ((uint64_t)SEND << 56) | fd;
io_uring_prep_send(sqe, fd, serverBuf, writeSize, MSG_NOSIGNAL);
}
static void addAccept() {
struct io_uring_sqe* sqe = getSqe();
io_uring_prep_accept(sqe, listenFd, (struct sockaddr*)&clientAddr, &clientLen, 0);
sqe->user_data = (uint64_t)ACCEPT << 56;
}
static void processAccept(struct io_uring_cqe* cqe) {
if (cqe->res < 0) {
fail("accept %s", strerror(-cqe->res));
}
fprintf(stderr, "client connected at %d\n", ntohs(clientAddr.sin_port));
int fd = cqe->res;
addSend(fd);
addAccept();
}
static void processSend(int fd, struct io_uring_cqe* cqe) {
if (cqe->res == 0) {
fprintf(stderr, "got 0 in cqe->res, closing fd, flags=%016x\n", cqe->flags);
close(fd);
return;
}
if (cqe->res < 0) {
fprintf(stderr, "got error in cqe->res (%s), closing fd\n", strerror(-cqe->res));
close(fd);
return;
}
addSend(fd);
}
static void allocateServerBuf() {
int fd = open(".", O_TMPFILE|O_RDWR, 0600);
if (fd < 0) {
fail("open %s", strerror(errno));
}
if (ftruncate(fd, writeSize) < 0) {
fail("ftruncate %s", strerror(errno));
}
serverBuf = mmap(NULL, writeSize, PROT_READ, MAP_PRIVATE, fd, 0);
if (serverBuf == MAP_FAILED) {
fail("mmap %s", strerror(errno));
}
}
static void serverUring() {
allocateServerBuf();
setupUring(16, &uring);
setupServerSocket();
addAccept();
for (;;) {
{
int ret = io_uring_submit_and_wait(&uring, 1);
if (ret < 0) {
fail("io_uring_submit_and_wait %d", ret);
}
}
struct io_uring_cqe* cqe;
unsigned head;
unsigned count = 0;
io_uring_for_each_cqe(&uring, head, cqe) {
count++;
uint8_t cmd = cqe->user_data >> 56;
uint64_t data = cqe->user_data & ~(0xFFull << 56);
switch (cmd) {
case ACCEPT:
processAccept(cqe);
break;
case SEND: {
int fd = data;
processSend(fd, cqe);
break; }
default:
fail("bad command %d", cmd);
}
}
io_uring_cq_advance(&uring, count);
}
}
static void serverVanilla() {
allocateServerBuf();
setupServerSocket();
int clientFd = accept(listenFd, NULL, NULL);
if (clientFd < 0) {
fail("accept %s", strerror(errno));
}
fprintf(stderr, "client connected, sending\n");
for (;;) {
ssize_t r = send(clientFd, serverBuf, writeSize, MSG_NOSIGNAL);
if (r < 0) {
fail("send %s", strerror(errno));
}
if (r == 0) {
fail("send returned zero");
}
}
}
static int connectToServer() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
fail("socket %s", strerror(fd));
}
struct sockaddr_in servAddr;
memset((char*)&servAddr, 0, sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_port = htons(listenPort);
if (inet_addr("127.0.0.1") == INADDR_NONE) {
fail("inet_addr");
}
servAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
if (connect(fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0) {
fail("connect %s", strerror(errno));
}
return fd;
}
static void client() {
int fd = connectToServer();
fprintf(stderr, "connected to server\n");
char* buf = (char*)malloc(readSize);
if (!buf) {
fail("could not allocate");
}
for (;;) {
ssize_t r = read(fd, buf, readSize);
if (r < 0) {
fail("couldn't read %s", strerror(errno));
}
}
}
static void usage(int argc, const char** argv) {
fprintf(stderr, "usage: %s server|client\n", argv[0]);
exit(2);
}
int main(int argc, const char** argv) {
if (argc != 2) {
usage(argc, argv);
}
if (strcmp("server", argv[1]) == 0) {
serverUring();
} else if (strcmp("server-vanilla", argv[1]) == 0) {
serverVanilla();
} else if (strcmp("client", argv[1]) == 0) {
client();
} else {
usage(argc, argv);
}
}
Ah I see. sqe->len is an unsigned 32-bit, so 4G will make it 0. So you essentially end up asking for 0 bytes, and that's what you then get in return.
Note that on Linux, for any syscall, the transfer limit is always going to be 2G at the most.
@axboe that makes sense, and I knew about the 0x7ffff000 maximum write for write/send, but I'd argue that the fact that it just does not work for io_uring is still wrong. It works just fine for raw write/send calls.
Maybe some len & ~(uint32_t)0 should be replaced with a min(len, ~(uint32_t)0)?
This is the API, it's a 32-bit unsigned. There's no way I/liburing/io_uring can know you're setting it to 4G, when it's already truncated by the time liburing sees it. Arguably the bug here is that io_uring_prep_send() takes a size_t, it should take an unsigned int...
@axboe yes if that is the API then io_uring_prep_send should be changed to get an u32 length. Otherwise I'd imagine most people would assume that the API matches the one for send, which has size_t length.
If the prep side could return an error, then it could flag it. But it cannot. But perhaps it should just truncate it? Eg if
if (len != (unsigned int) len)
len = 2G;
or something like that. But also a bit annoying to have branch for that, could most likely be done in a cleaner fashion.
It's not easy as changing the size_t to an unsigned int, as that will potentially break existing applications. The 2.x series of liburing can all be installed and work as-is for applications without requiring a re-compile/link.
So I think the best option here is to add it to the man page(s) and perhaps cap it in a nice fashion that doesn't entail a branch. And then for 3.x of whenever a breaking upgrade is done, then the signatures could change to take the appropriate type.
You don't need any branch, just uint32_t len32 = std::min<size_t>(len, ~(uint32_t)0) (forgive the C++), which will probably compile to a cmov.
(And even the branch would be super predictable, that is, almost free)
Right, that's what I'm alluding, it can be done like that rather than my pseudo code that just mostly demonstrates the idea.
Want to send some patches?
(And even the branch would be super predictable, that is, almost free)
It's still a branch, even if it'll predict nicely. And it's not like the branch predictor history is infinite.