differential-datalog
differential-datalog copied to clipboard
First several profiling messages are lost.
The ddlog command file can have cpu profile on
anywhere in it. Therefore the closure passed to log register checks the variable profcpu
on every invocation as this value may be updated at any time.
worker.log_register().insert::<TimelyEvent,_>("timely", move |_time, data| {
let profcpu: &AtomicBool = &*profile_cpu3;
...
However, even when putting cpu profile on
as the first line in the command file:
> more souffle.dat
profile timely on;
start;
There is a small window of time between this command getting parsed and the new profcpu
value being read where timely events are dropped.
This can be observed by printing out the arriving data and the current value of profcpu
println!("data {:?}", data);
let profcpu: &AtomicBool = &*profile_cpu3;
println!("profile cpu {:?}", profcpu);
We see:
profile cpu false
Data [(640.450601ms, 0, Schedule(ScheduleEvent { id: 0, start_stop: Start })), (642.559841ms, 0, Schedule(ScheduleEvent { id: 117, start_stop: Start })), (642.59014ms, 0, Schedule(ScheduleEvent { id: 117, start_stop: Stop })), (642.600566ms, 0, PushProgress(PushProgressEvent { op_id: 121 })), (642.602353ms, 0, Schedule(ScheduleEvent { id: 121, start_stop: Start }))...
Due to the filtering function in this closure, these data events are filtered out and never logged.
The first logging events emitted by a timely dataflow are Operates events while the graph is being created. These events are not dropped thankfully because we always return true for filtering on Operates events: TimelyEvent::Operates(_) => true,
Here is a concrete example of how this bug causing problems. To calculate Park event intervals I store the start times in a hash table and pop them later:
TimelyEvent::Park(ParkEvent::Park(None)) => {
self.park_start_time.new_start_time(worker_index, timestamp);
}
TimelyEvent::Park(ParkEvent::Unpark) => {
let start_time = self.park_start_time.pop_start_time(&worker_index);
self.csv_writer.serialize(CSVLogEvent::park_entry(worker_index, &start_time, ×tamp)).
expect("unable to serialize record");
}
However, currently I'm seeing an Unpark event without a corresponding park event at the very begging of this program. I'm pretty sure the Park was dropped due to this bug.
This might also explain why this code is currently needed in profile.rs
:
fn handle_cpu_profiling(...) {
....
let start = self.starts.get(&(*id,worker_id)).cloned().unwrap_or_else(||{
eprintln!("TimelyEvent::Stop without a start for operator {}, worker {}", *id, worker_id);
Duration::new(0,0)
});
....