light-speed-io
light-speed-io copied to clipboard
Multiple threads (each with a uring) & `crossbeam::deque` & a pipeline of I/O steps
Use-cases
- For
get
, we need to get the filesize for a batch of operations using io_uring (#41) whilst concurrently loading other operations. (We'd maybe want to add afilesize_bytes: Option<u64>
field toOperation::Get
). - When we're loading many non-contiguous small chunks from a file, we'll want to
open
the file in one step, and read in another step. Possibly using separate threads for each step: one uring per thread.
Implementation and questions
- Users submit tasks using a
crossbeam::deque::Injector
(orcrossbeam::deque
or aStream
?) - On startup, we'd create several worker threads, each with its own io_uring? (but how do we also have multiple threads per processing step? ~~Maybe each processing step is actually a Rayon threadpool, where each thread in that threadpool has its own io_uring, because we can't share io_urings across threads?~~)
- ~~Tasks first arrive in a "orchestrator" thread? Which, somehow, wires up other threads to do the work? (e.g. a simple "get filesize" operation would just go to one threadpool. But an "open file and read lots of non-contiguous chunks" operation would go to three steps: 1) get filesize, 2) open (steps 1 and 2 can happen concurrently), 3) read-and-close (chained in io_uring).~~
- UPDATE: Don't use Rayon. Just have a user-settable number of worker threads, each with its own io_uring. Threads pop
Operation
s off theInjector
FIFO. For multi-step operations (like get filesize, open, read lots of chunks), use the same thread. Each thread will also have an local queue of tasks to submit to its io_uring "ASAP". These tasks will always take precedence over the new tasks arriving from thecrossbeam::channel
. UPDATE: This sounds like whatcrossbeam::deque
was designed for, maybe I should use that. So, maybe the thread would look something like this:- The thread pops a
Task
from theInjector
FIFO queue. - If this
Task
is aget
then the thread checks if its uring can accommodate any more files. ~~If not, put theTask
onto theStealer
queue (shared with other threads. Or, if possible, return it to the front of theInjector
queue).~~ UPDATE: Actually, we can simplify this. Just program the thread not to pop any more tasks off the Injector if the thread can't handle any more files in flight. Then we don't need to "re-queue" tasks. - Assuming this is a
get
operation... - The thread first submits a "get filesize" op to its uring. And, if other
Tasks
are waiting in theInjector
queue, then submit those tasks to the uring too, as long as we don't exceed the number of files this uring can handle. - When the "get filesize" CQE emerges, allocate a buffer (or re-use a buffer and/or use a registered buffer), and then add all the
read
ops to the thread's local task queue. - When the final
read
op completes, then submit aclose
op. - While this is happening, this thread won't take tasks off the user queue. So other threads can take the load.
- The thread pops a
Cross references
- #32
- #33
- #44
- #60
What happens if we want to read 1 million chunks from a single file. As currently designed, that will all be handled by a single thread, and other threads will sit idle.
Some questions to answer:
- is it possible for multiple urings to read in parallel from a single file?
- I think we'd open the file once, without using a fixed descriptor. Ie, one thread will open the file, and then share that "normal" file descriptor with other worker threads.
- each worker thread can then use register_files to register that "normal" FD.
- does it improve performance to use multiple threads to read a single file?
- ~~I should do some fio experiments.~~ Update: I don't think fio can read from a single file from multiple threads?
- Even if using multiple threads doesn't help when reading a single file, does using multiple threads help when reading multiple files?
- ~~I had originally thought that there would be a 1:1 relationship between tasks and files. So reading 1 millions chunks from a single file would be represented by a single task (in the Injector FIFO). But then we have to have a Stealer queue for threads to share work. Maybe it's easier if we just have a single Task be a single read operation (ie a single contiguous range of bytes). And the thread will need to check if the filename for each new task is a file that the thread already has open. Or maybe it's simpler to go back to the idea of having an "open" step?~~
- UPDATE: on second thoughts, I can't think of an efficient way to have a shared Map that maps from filename to FD. It will always be a bottleneck, no matter if it's a shared Map behind a Mutex, or a single thread which services "get FD" requests. So I've gone back to thinking that the most lock-free way to implement this is to have a 1:1 relationship between files and Tasks, and to have a Stealer queue to allow threads to share tasks. So the thread that first receives a Task will be responsible for opening that file, and maybe stating the file. And then that thread will publish all the read ops (with the FD and filesize) on the Stealers queue
- but how to know when to close the file?! We could have an Atomic integer which is decremented on every CQE, but that's locky. Instead, maybe it's better to have a thread which receives messages. The messages say "completed one op for this filename". And the datastructure will include a callback that should be run when the counter gets to zero. That callback will submit a "close" op to any uring. When that close op completes, we need to broadcast to all the threads that they can free up that file's fixed entry.
On the injector queue, there will be on file per Operation. Reading a million chunks from one file will be represented by one Operation.
The thread that receives that Operation will break it into many Operations: On the Worker and Stealer queues, each op will allow multiple Operation
s per file. Each Operation
will represent a single read
or readv
opcode. These smaller ops will probably stay on that thread. They'll only be stolen if other threads are idle. So it naturally solves the dilemma that, if the workload is to open many files, then it's most efficient to only open one file per thread. But if the workload is to open many chunks from one file, then we do want to spread that across threads.
I think I've figured out how to know when to close each file: Each Operation
will have a u32 to say "this operation is one of n ops on this file". When a thread completes an op, it will send a message to the "closer" thread to say "I've completed 1 op out of n ops on this filename". When the closer thread sees n of these messages, then it'll broadcast a message to all threads to say "close this filename". If a thread hasn't opened this filename then that thread just ignores the message.
To open the file: each thread will keep an internal Map, that maps from filename to fixed file handle. If the filename doesn't exist in the map then the thread first submits an open op to uring, and then uses a strictly internal (no steeling) thread to read the file.
See fio
results in this comment for evidence that multiple threads are required to achieve top speed
Some updates:
I think most of the ideas above are still relevant.
One big change is that I'm now hoping to use Rayon
to manage the threadpools and injector queue and work-stealing queue.
But I think I'll need my own queue of tasks that must happen on that thread. I'm imagining that each UringOperation
will belong to exactly one thread.
Closing files
Multiple threads could be reading byte_ranges from the same file. How do we know when to close? Some options:
1. (Preferred option): Use Arc::try_unwrap()
We'd have a:
struct File {
filename: CString,
file_descriptor: io_uring::types::Fd,
}
Whenever we finish, we'd try to use file.try_unwrap()
. If it returns an Err
then we know multiple clones exist. If it returns Ok(File)
then we know we're the last to use this, and we need to take responsibility for closing it. This is also nice because we'll already be in a uring thread, and so we can submit the close op to our thread's internal job queue.
2. Build my own ref-counted File
struct!
Something like this:
struct FileInner {
filename: CString,
file_descriptor: io_uring::types::Fd,
}
struct File {
ptr: NonNull<FileInner>,
count: AtomicUsize,
// Ideally, we want the `Close` operation to be executed ASAP (so we don't use up our
// quota of opened files). So it'd be nice if we can push this `UringOperation::Close`
// to the front of the queue.
channel: Sender<UringOperation>,
}
impl Drop for File {
fn drop(&mut self) {
self.count -= 1;
if self.count == 0 {
self.channel.send(UringOperation::Close(self.ptr.file_descriptor));
drop(self);
}
}
}
This is also nice because it keeps the filename
and file_descriptor
together, and doesn't require a separate filename: Arc<CString>
and a separate n_ops_pending: AtomicUsize
.
On holiday, I'll try to read my new "Rust Atomics and Locks" book!
A downside of this is that I'll probably have to write some unsafe
code, that I wouldn't have to write if I just used Arc
.
3. Have a n_ops_pending: AtomicUsize
field in GetRange
.
When this gets to zero, send a message to close.
4. Use a separate thread which keeps track of which files need to be closed.
Maybe this separate thread also has its own uring?
This is all implemented in PR #125!