crossbeam icon indicating copy to clipboard operation
crossbeam copied to clipboard

Q: Decouple IO and parsing using channel

Open ProphetLamb opened this issue 2 years ago • 0 comments

I need to parse a set of log files and combine them into a single output. We have a few gigs of logs, so takes a few minutes to complete when using a single thread. I looked into how I could do thing using crossbeam.

My solution spawns an IO thread per file and sends lines into a channel. The CPU threads consume those lines, when possible.

Once the IO thread is terminated a counter is decreased towards zero. Once the IO pipeline in empty, and no files remain - the counter is a zero - a CPU thread may terminate.

pub fn parse_logs(logs: Vec<(PathBuf, NaiveDateTime)>) -> (Vec<Entry>, Vec<String>) {
    let (sender, receiver) = unbounded(); // work queue fed by io, consumed by cpu
    let io_remaining = Arc::new(AtomicUsize::new(logs.len())); // counts open io threads
    let messages = Arc::new(SegQueue::new()); // result messages
    let errors = Arc::new(SegQueue::new()); // parse failures
    thread::scope(|scope| {
        for (log, date) in logs {
            scope.spawn(closure!(clone io_remaining, clone sender, |_|{
                if let Ok(file) = fs::File::open(log) {
                    let reader = BufReader::new(file);
                    for line in reader.lines().flatten() {
                        // collect log information for parser
                        _ = sender.send((line, date.date()));
                    }
                }
                io_remaining.fetch_sub(1, Ordering::SeqCst);
            }));
        }
        for _ in 1..num_cpus::get() {
            scope.spawn(closure!(clone io_remaining, clone messages, clone errors, clone receiver, |_| {
                while io_remaining.load(Ordering::SeqCst) != 0 {
                    while let Ok((line, date)) = receiver.try_recv() {
                        // parse collection information
                        if let Ok((_, entry)) = parse_entry::<()>(date)(&line) {
                            messages.push(entry);
                        } else {
                            errors.push(line.clone());
                        }
                    }
                    std::thread::sleep(Duration::new(0,1)); // encourage ctx change on io pipe fail
                }
            }));
        }
    }).unwrap();
    (collect_segq(messages), collect_segq(errors))
}

fn collect_segq<T, Q: Deref<Target = SegQueue<T>>>(q: Q) -> Vec<T> {
    let mut clone = Vec::with_capacity(q.len());
    while let Some(item) = q.pop() {
        clone.push(item);
    }
    clone
}

The implementation of parse_entry uses nom for parsing.

I am adding an out channel to a database, instead of collecting in memory.

I am quite unsure, whether I am using the components as intended, or if there is a more trivial solution to a IO-CPU bus than this.

All input is appreciated! Cheers

ProphetLamb avatar May 24 '22 13:05 ProphetLamb