crossbeam
crossbeam copied to clipboard
Q: Decouple IO and parsing using channel
I need to parse a set of log files and combine them into a single output. We have a few gigs of logs, so takes a few minutes to complete when using a single thread. I looked into how I could do thing using crossbeam.
My solution spawns an IO thread per file and sends lines into a channel. The CPU threads consume those lines, when possible.
Once the IO thread is terminated a counter is decreased towards zero. Once the IO pipeline in empty, and no files remain - the counter is a zero - a CPU thread may terminate.
pub fn parse_logs(logs: Vec<(PathBuf, NaiveDateTime)>) -> (Vec<Entry>, Vec<String>) {
let (sender, receiver) = unbounded(); // work queue fed by io, consumed by cpu
let io_remaining = Arc::new(AtomicUsize::new(logs.len())); // counts open io threads
let messages = Arc::new(SegQueue::new()); // result messages
let errors = Arc::new(SegQueue::new()); // parse failures
thread::scope(|scope| {
for (log, date) in logs {
scope.spawn(closure!(clone io_remaining, clone sender, |_|{
if let Ok(file) = fs::File::open(log) {
let reader = BufReader::new(file);
for line in reader.lines().flatten() {
// collect log information for parser
_ = sender.send((line, date.date()));
}
}
io_remaining.fetch_sub(1, Ordering::SeqCst);
}));
}
for _ in 1..num_cpus::get() {
scope.spawn(closure!(clone io_remaining, clone messages, clone errors, clone receiver, |_| {
while io_remaining.load(Ordering::SeqCst) != 0 {
while let Ok((line, date)) = receiver.try_recv() {
// parse collection information
if let Ok((_, entry)) = parse_entry::<()>(date)(&line) {
messages.push(entry);
} else {
errors.push(line.clone());
}
}
std::thread::sleep(Duration::new(0,1)); // encourage ctx change on io pipe fail
}
}));
}
}).unwrap();
(collect_segq(messages), collect_segq(errors))
}
fn collect_segq<T, Q: Deref<Target = SegQueue<T>>>(q: Q) -> Vec<T> {
let mut clone = Vec::with_capacity(q.len());
while let Some(item) = q.pop() {
clone.push(item);
}
clone
}
The implementation of parse_entry
uses nom for parsing.
I am adding an out channel to a database, instead of collecting in memory.
I am quite unsure, whether I am using the components as intended, or if there is a more trivial solution to a IO-CPU bus than this.
All input is appreciated! Cheers