light-speed-io icon indicating copy to clipboard operation
light-speed-io copied to clipboard

What would user's code would look like the for simple use-cases? (After dropping `async`, and instead only using `channels` and `Rayon`)?

Open JackKelly opened this issue 11 months ago • 9 comments

For reference: Here's a quick code sketch I wrote, back when I was planning to use Tokio with Rayon.

Use cases

1) File conversion & re-chunking

  1. Send LSIO a list of millions of chunks to read.
  2. Have a Rayon threadpool which decompresses those chunks as soon as they arrive from disk. Tell the IO code to re-use IO buffers as soon as the data has been decompressed from an IO buffer.
  3. Have a thread which "reduces" and creates output chunks. Let's say that each output chunk contains 10 input chunks.
  4. Rayon threadpool compresses those chunks.
  5. And writes those compressed chunks back into the io_uring channel for writing to disk.

2) ML training: Load random crops of Zarr data from disk.

3) Vincent's use-case: Loading timeseries (added 2024-04-04)

Imagine the chunks are arranged like this:

0.0   0.1   0.2   0.3
1.0   1.1   1.2   1.3
2.0   2.1   2.2   2.3
3.0   3.1   3.2   3.3

Each row is a timeseries. So, for example, chunks 0.0, 0.1, 0.2, 0.3 have to be given to the user in order.

But the decompression could happen out-of-order.

Perhaps the solution is for the user to group each row. And then, within a row, decompress chunks in any order. And then re-order the decompressed chunks before delivering to the user?

JackKelly avatar Mar 15 '24 11:03 JackKelly

UPDATE: This attempt is pretty dumb :slightly_smiling_face:. See the comment below for a new, improved approach!


Here's an attempt at making use-case 1 work:

Inputs:

  • src_filenames: Receiver<Vec<CString>>: The million files to load, divided into 100,000 Vec<CString>s, where each Vec<CString> is a single group. Group n will finish loading before loading group n+1.
const MAX_N_BUFFERS: usize = 1024;
let mut uring_local = IoUringLocal::new(MAX_N_BUFFERS);

// Start loading the files in a separate threadpool. Completed buffers will
// appear in `rx_of_filled_buffers`. We apply back-pressure by either letting
// `rx_of_filled_buffers` fill to `MAX_N_BUFFERS`, or by not telling `uring_local`
// that it can re-use buffers. This will not block. This will start loading immediately.
let rx_of_filled_buffers = uring_local.get(src_filenames);

// Decompress buffers:
// This will run `decompression_function` on each chunk using separate Rayon threadpool:
let decompressor = ChannelProcessor::new();
let rx_of_decompressed_buffers = decompressor.map(rx_of_filled_buffers, decompression_function);

// Tell io_uring to re-use IO buffers as soon as decompression has finished for each chunk:
// This function passes-through each completed chunk:
rx_of_decompressed_buffers = uring_local.reuse_buffers(rx_of_decompressed_buffers);

// Reduce:
let rx_of_reduced_buffers = reduce(rx_of_decompressed_buffers, 10);

// Compress:
let rx_of_compressed_output_buffers = compressor.map(rx_of_reduced_buffers, compression_function);

// Write. Behind the scenes, this will be added to the same uring threadpool's list of operations, I guess?
uring_local.put(rx_of_compressed_output_buffers);

JackKelly avatar Mar 15 '24 12:03 JackKelly

Writing the code in the comment above give me an idea that I've summarised in issue #106

JackKelly avatar Mar 15 '24 12:03 JackKelly

UPDATE 2024-04-03: The code below is out-of-date! Please see this code sketch for the latest ideas: https://github.com/JackKelly/light-speed-io/blob/new-design-March-2024/src/new_design_march_2024.rs


Let me try again, now that I've figured out that we can just use iterators (see this code sketch)!

Structs

struct ByteRange<M> {
    byte_range: Range<isize>,

    // metadata is used to identify this byte range.
    // For example, in Zarr, this would be used to identify the
    // location at which this chunk appears in the merged array.
    metadata: M,
}

enum OperationKind {
    GetRanges,
    PutRanges,
}

struct Operation<M> {
    operation_kind: OperationKind,
    buffers: Option<Vec<AlignedBuffer>>, // For PutRanges
    byte_ranges: Option<Vec<ByteRange<M>>>,  // For GetRanges and PutRanges
    filename: CString,  // For GetRanges and PutRanges
}

impl<M> Operation<M> {
    /// If the user submits a GetRanges operation with an invalid filename then
    /// the user will receive a single Err(std::io::ErrorKind::NotFound) with context
    /// that describes the filename that failed.
    /// If a subset of the `byte_ranges` results in an error (e.g. reading beyond
    /// end of the file) then the user will receive a mixture of `Ok(Output::Buffer)`
    /// and `Err`, where the `Err` will include context such as the filename and byte_range.
    fn get_ranges(filename, byte_ranges) -> Self<M> {
    }

    fn put_ranges(filename, byte_ranges, buffers) -> Self<M> {
        // TODO: Maybe we also need a `slices: &[u8]` field, which gives one slice
        // per `byte_range`, whilst also having a `buffers` field to own the `AlignedBuffer`.
    }
}

struct OpGroup<M> {
    operations: Receiver<Operation<M>>,

    // Metadata for the whole group. Such as the filename of the merged output.
    metadata: M,
}

struct Output<M> {
    // Each `byte_range` within an `Operation::GetRanges` returns a `Buffer`.
    operation_kind: OperationKind,
    buffer: Option<AlignedBuffer>,
    byte_range: Option<ByteRange<M>>,
}

struct OutputGroup<GROUPMETA, OUTPUTMETA> {
    // We use a `Receiver` so we can process the next `Buffer` as soon as the producing
    // thread finishes each `Buffer`:
    // 
    outputs: Receiver<Result<Output<OUTPUTMETA>>>,

    // Metadata for the group (e.g. the output filename).
    metadata: <GROUPMETA>,
}

User code

const MAX_N_BUFFERS: usize = 1024;
let mut uring_local = IoUringLocal::new(MAX_N_BUFFERS);

let mut submission_queue: Sender<OpGroup> = uring_local.submission();

// Define operations to get a bunch of files:
let get_group_0 = OpGroup::new()
    .extend(!vec[
        Operation::get_ranges("foo.0.0", 0..-1),
        Operation::get_ranges("foo.0.1", 0..-1),
    ])
    .metadata(OutputFilename("foo_0"));

// Define operations to get a bunch of files:
let get_group_1 = OpGroup::new()
    .extend(!vec[
        Operation::get_ranges("foo.1.0", 0..-1),
        Operation::get_ranges("foo.1.1", 0..-1),
    ])
    .metadata(OutputFilename("foo_1"));

// Start loading the files in a separate threadpool:
submission_queue.send(get_group_0).unwrap();
submission_queue.send(get_group_1).unwrap();

// uring_local will load all operations from `get_group_0`. And then from `get_group_1`.
// Now we can wait on the completed items.

let completion_queue: Receiver<OutputGroup> = uring_local.completion();

let mut buffer_recycling_queue = uring_local.buffer_recycling_queue();

completion_queue.into_iter().par_bridge().for_each(|output_group: OutputGroup| {
    let out = output_group.outputs.into_iter().par_bridge()
        .map(|output| {
            assert_eq!(output.operation_kind, GetRanges);
            let decompressed = decompress(&output.buffer.unwrap());
            buffer_recycling_queue.send(output.buffer.take()).unwrap();
            decompressed
        })
        .reduce(reduce_func);
    let out = compress(out);

    // Write `out` to disk:
    let put_op = Operation::put_ranges(output_group.metadata.output_filename, 0..-1, out);
    let op_group = OpGroup::new().append(put_op);
    submission_queue.send(op_group);  // Does not block.
});

JackKelly avatar Mar 15 '24 14:03 JackKelly

Actually, I don't think it's acceptable for the output channel to be a Receiver<Vec<Chunk>>. Because we want to start processing chunks as soon as they're ready. Maybe a channel of channels?! Or a channel of crossbeam queues?

But we probably do want the input channel to be a Sender<Vec<IoOperation>>. Or - perhaps better - a Sender<IoOpGroup> (so we can attach metadata for the whole group).

UPDATE: I've updated the code in the comment above

JackKelly avatar Mar 15 '24 15:03 JackKelly

UPDATE 2024-04-03: The code above is out-of-date! Please see this code sketch for the latest ideas: https://github.com/JackKelly/light-speed-io/blob/new-design-March-2024/src/new_design_march_2024.rs

JackKelly avatar Apr 03 '24 14:04 JackKelly

I need to think more about what the user code would look like (to satisfy the use-cases above) with the new code sketch. Some questions that spring to mind:

  • Can we use something like a Sender<IoOpGroup> to send groups to the IO threadpool. So we can attach group-wide metadata (such as the final output filename for that whole group).
  • Or, can we use a flat structure, and every Chunk struct would have a group: Arc<GROUP>? Operations for a single group would be submitted together. The GROUP object would contain group metadata (such as the output filename).

JackKelly avatar Apr 03 '24 15:04 JackKelly

can we use a flat structure

We can't have a flat list, where list items define the end of groups, because Rayon processes things out-of-order. So the following code never seems to print 0, 1, 2. Instead it print 0, 1. Or even just 0.

use rayon::prelude::*;

#[derive(Debug)]
enum Operation {
    Get(u8),
    EndOfGroup(u8),
}

fn main() {
    let ops = vec![
        Operation::Get(0),
        Operation::Get(1),
        Operation::Get(2),
        Operation::EndOfGroup(0),
        Operation::Get(10),
        Operation::Get(11),
        Operation::EndOfGroup(1),
    ];

    // Oops: Rayon may process these items out-of-order. So we might start loading group 1 before
    // we hit the EndOfGroup(0).
    ops.into_par_iter()
        .map(|op| {
            if matches!(op, Operation::Get { .. }) {
                Some(op)
            } else {
                None
            }
        })
        .while_some()
        .for_each(|op| println!("{:?}", op));
}

So I think we may have to use a channel of channels for the completion queue!

JackKelly avatar Apr 16 '24 15:04 JackKelly

I should test the channel of channels idea in my Rust playground

JackKelly avatar Apr 16 '24 15:04 JackKelly

Yup, using channels of channels works for both submission and completion!

use rayon::prelude::*;

#[derive(Debug)]
enum Operation {
    Get(u8),
}

fn main() {
    let (completion_tx, completion_rx) = crossbeam::channel::bounded(4);

    {
        let (submission_tx, submission_rx) = crossbeam::channel::bounded(4);

        // Send the first group of operations:
        let (inner_submission_tx_0, inner_submission_rx_0) = crossbeam::channel::bounded(4);
        vec![
            Operation::Get(1),
            Operation::Get(2),
            Operation::Get(3),
            Operation::Get(4),
        ]
        .into_iter()
        .for_each(|op| inner_submission_tx_0.send(op).unwrap());
        submission_tx.send(inner_submission_rx_0).unwrap();
        drop(inner_submission_tx_0);

        // Send the second group of operations:
        let (inner_submission_tx_1, inner_submission_rx_1) = crossbeam::channel::bounded(4);
        vec![
            Operation::Get(6),
            Operation::Get(7),
            Operation::Get(8),
            Operation::Get(9),
        ]
        .into_iter()
        .for_each(|op| inner_submission_tx_1.send(op).unwrap());
        submission_tx.send(inner_submission_rx_1).unwrap();
        drop(inner_submission_tx_1);

        // Hang up the submission_tx, otherwise we'll never finish!
        drop(submission_tx);

        // "Process" the submission queue, and send data to the completion queue:
        submission_rx.into_iter().par_bridge().for_each(|inner| {
            let (inner_completion_tx, inner_completion_rx) =
                crossbeam::channel::bounded::<Operation>(4);
            inner
                .into_iter()
                .par_bridge()
                .for_each(|Operation::Get(x)| {
                    inner_completion_tx.send(Operation::Get(x * 10)).unwrap()
                });
            completion_tx.send(inner_completion_rx).unwrap();
        });
    }

    drop(completion_tx);

    completion_rx.into_iter().for_each(|inner| {
        println!("GROUP:");
        inner.into_iter().for_each(|op| println!("{op:?}"));
    });
}

From: https://github.com/JackKelly/rust-playground/blob/493da54d3f5bd2373501374f7cc71d70c7c8de4a/groups-within-iterators/src/main.rs

JackKelly avatar Apr 16 '24 18:04 JackKelly