light-speed-io
light-speed-io copied to clipboard
What would user's code would look like the for simple use-cases? (After dropping `async`, and instead only using `channels` and `Rayon`)?
For reference: Here's a quick code sketch I wrote, back when I was planning to use Tokio with Rayon.
Use cases
1) File conversion & re-chunking
- Send LSIO a list of millions of chunks to read.
- Have a Rayon threadpool which decompresses those chunks as soon as they arrive from disk. Tell the IO code to re-use IO buffers as soon as the data has been decompressed from an IO buffer.
- Have a thread which "reduces" and creates output chunks. Let's say that each output chunk contains 10 input chunks.
- Rayon threadpool compresses those chunks.
- And writes those compressed chunks back into the io_uring channel for writing to disk.
2) ML training: Load random crops of Zarr data from disk.
3) Vincent's use-case: Loading timeseries (added 2024-04-04)
Imagine the chunks are arranged like this:
0.0 0.1 0.2 0.3
1.0 1.1 1.2 1.3
2.0 2.1 2.2 2.3
3.0 3.1 3.2 3.3
Each row is a timeseries. So, for example, chunks 0.0, 0.1, 0.2, 0.3 have to be given to the user in order.
But the decompression could happen out-of-order.
Perhaps the solution is for the user to group each row. And then, within a row, decompress chunks in any order. And then re-order the decompressed chunks before delivering to the user?
UPDATE: This attempt is pretty dumb :slightly_smiling_face:. See the comment below for a new, improved approach!
Here's an attempt at making use-case 1 work:
Inputs:
-
src_filenames: Receiver<Vec<CString>>
: The million files to load, divided into 100,000Vec<CString>
s, where eachVec<CString>
is a single group. Group n will finish loading before loading group n+1.
const MAX_N_BUFFERS: usize = 1024;
let mut uring_local = IoUringLocal::new(MAX_N_BUFFERS);
// Start loading the files in a separate threadpool. Completed buffers will
// appear in `rx_of_filled_buffers`. We apply back-pressure by either letting
// `rx_of_filled_buffers` fill to `MAX_N_BUFFERS`, or by not telling `uring_local`
// that it can re-use buffers. This will not block. This will start loading immediately.
let rx_of_filled_buffers = uring_local.get(src_filenames);
// Decompress buffers:
// This will run `decompression_function` on each chunk using separate Rayon threadpool:
let decompressor = ChannelProcessor::new();
let rx_of_decompressed_buffers = decompressor.map(rx_of_filled_buffers, decompression_function);
// Tell io_uring to re-use IO buffers as soon as decompression has finished for each chunk:
// This function passes-through each completed chunk:
rx_of_decompressed_buffers = uring_local.reuse_buffers(rx_of_decompressed_buffers);
// Reduce:
let rx_of_reduced_buffers = reduce(rx_of_decompressed_buffers, 10);
// Compress:
let rx_of_compressed_output_buffers = compressor.map(rx_of_reduced_buffers, compression_function);
// Write. Behind the scenes, this will be added to the same uring threadpool's list of operations, I guess?
uring_local.put(rx_of_compressed_output_buffers);
Writing the code in the comment above give me an idea that I've summarised in issue #106
UPDATE 2024-04-03: The code below is out-of-date! Please see this code sketch for the latest ideas: https://github.com/JackKelly/light-speed-io/blob/new-design-March-2024/src/new_design_march_2024.rs
Let me try again, now that I've figured out that we can just use iterators (see this code sketch)!
Structs
struct ByteRange<M> {
byte_range: Range<isize>,
// metadata is used to identify this byte range.
// For example, in Zarr, this would be used to identify the
// location at which this chunk appears in the merged array.
metadata: M,
}
enum OperationKind {
GetRanges,
PutRanges,
}
struct Operation<M> {
operation_kind: OperationKind,
buffers: Option<Vec<AlignedBuffer>>, // For PutRanges
byte_ranges: Option<Vec<ByteRange<M>>>, // For GetRanges and PutRanges
filename: CString, // For GetRanges and PutRanges
}
impl<M> Operation<M> {
/// If the user submits a GetRanges operation with an invalid filename then
/// the user will receive a single Err(std::io::ErrorKind::NotFound) with context
/// that describes the filename that failed.
/// If a subset of the `byte_ranges` results in an error (e.g. reading beyond
/// end of the file) then the user will receive a mixture of `Ok(Output::Buffer)`
/// and `Err`, where the `Err` will include context such as the filename and byte_range.
fn get_ranges(filename, byte_ranges) -> Self<M> {
}
fn put_ranges(filename, byte_ranges, buffers) -> Self<M> {
// TODO: Maybe we also need a `slices: &[u8]` field, which gives one slice
// per `byte_range`, whilst also having a `buffers` field to own the `AlignedBuffer`.
}
}
struct OpGroup<M> {
operations: Receiver<Operation<M>>,
// Metadata for the whole group. Such as the filename of the merged output.
metadata: M,
}
struct Output<M> {
// Each `byte_range` within an `Operation::GetRanges` returns a `Buffer`.
operation_kind: OperationKind,
buffer: Option<AlignedBuffer>,
byte_range: Option<ByteRange<M>>,
}
struct OutputGroup<GROUPMETA, OUTPUTMETA> {
// We use a `Receiver` so we can process the next `Buffer` as soon as the producing
// thread finishes each `Buffer`:
//
outputs: Receiver<Result<Output<OUTPUTMETA>>>,
// Metadata for the group (e.g. the output filename).
metadata: <GROUPMETA>,
}
User code
const MAX_N_BUFFERS: usize = 1024;
let mut uring_local = IoUringLocal::new(MAX_N_BUFFERS);
let mut submission_queue: Sender<OpGroup> = uring_local.submission();
// Define operations to get a bunch of files:
let get_group_0 = OpGroup::new()
.extend(!vec[
Operation::get_ranges("foo.0.0", 0..-1),
Operation::get_ranges("foo.0.1", 0..-1),
])
.metadata(OutputFilename("foo_0"));
// Define operations to get a bunch of files:
let get_group_1 = OpGroup::new()
.extend(!vec[
Operation::get_ranges("foo.1.0", 0..-1),
Operation::get_ranges("foo.1.1", 0..-1),
])
.metadata(OutputFilename("foo_1"));
// Start loading the files in a separate threadpool:
submission_queue.send(get_group_0).unwrap();
submission_queue.send(get_group_1).unwrap();
// uring_local will load all operations from `get_group_0`. And then from `get_group_1`.
// Now we can wait on the completed items.
let completion_queue: Receiver<OutputGroup> = uring_local.completion();
let mut buffer_recycling_queue = uring_local.buffer_recycling_queue();
completion_queue.into_iter().par_bridge().for_each(|output_group: OutputGroup| {
let out = output_group.outputs.into_iter().par_bridge()
.map(|output| {
assert_eq!(output.operation_kind, GetRanges);
let decompressed = decompress(&output.buffer.unwrap());
buffer_recycling_queue.send(output.buffer.take()).unwrap();
decompressed
})
.reduce(reduce_func);
let out = compress(out);
// Write `out` to disk:
let put_op = Operation::put_ranges(output_group.metadata.output_filename, 0..-1, out);
let op_group = OpGroup::new().append(put_op);
submission_queue.send(op_group); // Does not block.
});
Actually, I don't think it's acceptable for the output channel to be a Receiver<Vec<Chunk>>
. Because we want to start processing chunks as soon as they're ready. Maybe a channel of channels?! Or a channel of crossbeam queues?
But we probably do want the input channel to be a Sender<Vec<IoOperation>>
. Or - perhaps better - a Sender<IoOpGroup>
(so we can attach metadata for the whole group).
UPDATE: I've updated the code in the comment above
UPDATE 2024-04-03: The code above is out-of-date! Please see this code sketch for the latest ideas: https://github.com/JackKelly/light-speed-io/blob/new-design-March-2024/src/new_design_march_2024.rs
I need to think more about what the user code would look like (to satisfy the use-cases above) with the new code sketch. Some questions that spring to mind:
- Can we use something like a
Sender<IoOpGroup>
to send groups to the IO threadpool. So we can attach group-wide metadata (such as the final output filename for that whole group). - Or, can we use a flat structure, and every
Chunk
struct would have agroup: Arc<GROUP>
? Operations for a single group would be submitted together. TheGROUP
object would contain group metadata (such as the output filename).
can we use a flat structure
We can't have a flat list, where list items define the end of groups, because Rayon processes things out-of-order. So the following code never seems to print 0, 1, 2. Instead it print 0, 1. Or even just 0.
use rayon::prelude::*;
#[derive(Debug)]
enum Operation {
Get(u8),
EndOfGroup(u8),
}
fn main() {
let ops = vec![
Operation::Get(0),
Operation::Get(1),
Operation::Get(2),
Operation::EndOfGroup(0),
Operation::Get(10),
Operation::Get(11),
Operation::EndOfGroup(1),
];
// Oops: Rayon may process these items out-of-order. So we might start loading group 1 before
// we hit the EndOfGroup(0).
ops.into_par_iter()
.map(|op| {
if matches!(op, Operation::Get { .. }) {
Some(op)
} else {
None
}
})
.while_some()
.for_each(|op| println!("{:?}", op));
}
So I think we may have to use a channel of channels for the completion queue!
I should test the channel of channels idea in my Rust playground
Yup, using channels of channels works for both submission and completion!
use rayon::prelude::*;
#[derive(Debug)]
enum Operation {
Get(u8),
}
fn main() {
let (completion_tx, completion_rx) = crossbeam::channel::bounded(4);
{
let (submission_tx, submission_rx) = crossbeam::channel::bounded(4);
// Send the first group of operations:
let (inner_submission_tx_0, inner_submission_rx_0) = crossbeam::channel::bounded(4);
vec![
Operation::Get(1),
Operation::Get(2),
Operation::Get(3),
Operation::Get(4),
]
.into_iter()
.for_each(|op| inner_submission_tx_0.send(op).unwrap());
submission_tx.send(inner_submission_rx_0).unwrap();
drop(inner_submission_tx_0);
// Send the second group of operations:
let (inner_submission_tx_1, inner_submission_rx_1) = crossbeam::channel::bounded(4);
vec![
Operation::Get(6),
Operation::Get(7),
Operation::Get(8),
Operation::Get(9),
]
.into_iter()
.for_each(|op| inner_submission_tx_1.send(op).unwrap());
submission_tx.send(inner_submission_rx_1).unwrap();
drop(inner_submission_tx_1);
// Hang up the submission_tx, otherwise we'll never finish!
drop(submission_tx);
// "Process" the submission queue, and send data to the completion queue:
submission_rx.into_iter().par_bridge().for_each(|inner| {
let (inner_completion_tx, inner_completion_rx) =
crossbeam::channel::bounded::<Operation>(4);
inner
.into_iter()
.par_bridge()
.for_each(|Operation::Get(x)| {
inner_completion_tx.send(Operation::Get(x * 10)).unwrap()
});
completion_tx.send(inner_completion_rx).unwrap();
});
}
drop(completion_tx);
completion_rx.into_iter().for_each(|inner| {
println!("GROUP:");
inner.into_iter().for_each(|op| println!("{op:?}"));
});
}
From: https://github.com/JackKelly/rust-playground/blob/493da54d3f5bd2373501374f7cc71d70c7c8de4a/groups-within-iterators/src/main.rs