Add a job scheduler.
The scheduler is responsible for running the checks and getting the results.
A simple scheduler would look something like this in Python:
my_checks = [
CheckThingA(),
CheckThingB(),
CheckThingC(),
]
for check in my_checks:
result = pychecks.run(check)
But, we might be able to run the checks in multiple threads/async tasks/processes depending on the context. Or, we may have a DAG scheduler so a job can run if its parents pass.
import asyncio
import croniter
import datetime
import logging
import random
import time
class Check:
def __init__(self, name, cron, dependencies=None, max_retries=0, retry_delay=60):
"""
A class representing a single check to be performed.
Args:
name (str): The name of the check.
cron (str): A string in cron format that determines when the check should be run.
dependencies (list of Check, optional): Other checks that must succeed before this check can run.
max_retries (int, optional): The maximum number of times to retry the check if it fails.
retry_delay (int, optional): The number of seconds to wait between retries.
"""
self.name = name
self.cron = cron
self.dependencies = dependencies or []
self.max_retries = max_retries
self.retry_delay = retry_delay
self.current_retries = 0
async def run(self):
"""
Perform the check and return the result.
"""
# simulate a delay to make the check take some time
await asyncio.sleep(random.randint(1, 3))
return f"{self.name} check result"
def should_run(self):
"""
Determine whether the check should be run based on the current time and its dependencies.
"""
if self.current_retries >= self.max_retries:
# don't retry the check if it has already exceeded the maximum number of retries
return False
for dep in self.dependencies:
if not dep.success:
# don't run the check if any of its dependencies have failed
return False
# use croniter to determine whether the check should run based on its cron schedule
return croniter.croniter(self.cron, datetime.datetime.now()).get_next(datetime.datetime) <= datetime.datetime.now()
async def run_with_retry(self):
"""
Run the check with retries if it fails.
"""
while self.should_run():
try:
result = await self.run()
self.success = True
logging.info(f"{self.name} check succeeded: {result}")
except Exception as e:
self.success = False
self.current_retries += 1
logging.error(f"{self.name} check failed ({e}). Retrying in {self.retry_delay} seconds...")
await asyncio.sleep(self.retry_delay)
class Scheduler:
def __init__(self, checks):
"""
A class representing a scheduler for running checks.
Args:
checks (list of Check): The list of checks to be run.
"""
self.checks = checks
# create a dictionary of dependencies between checks
self.dependencies = {c.name: c.dependencies for c in checks}
async def run_checks(self):
"""
Run the checks in parallel using asyncio.
"""
# create a dictionary of futures for each check
futures = {c.name: asyncio.ensure_future(c.run_with_retry()) for c in self.checks}
while futures:
# wait for the first future to complete
done, _ = await asyncio.wait(futures.values(), return_when=asyncio.FIRST_COMPLETED)
for fut in done:
# remove the completed future from the dictionary
del futures[fut.result().name]
# update the dependencies of other checks that depend on this one
for c in self.checks:
if fut.result().name in self.dependencies[c.name]:
c.dependencies.remove(fut.result())
def start(self):
"""
Set up logging and start the event loop to run the checks.
"""
# configure logging
logging.basicConfig(level=logging.INFO)
# create a new event loop
loop = asyncio.get_event_loop()
try:
# run the checks using the event loop
loop.run_until_complete(self.run_checks())
finally:
# close the event loop when finished
loop.close()
use chrono::{DateTime, Utc};
use cron::Schedule;
use futures::future::join_all;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct Check {
name: String,
cron: Schedule,
dependencies: Vec<Arc<Mutex<Check>>>,
max_retries: u32,
retry_delay: u64,
current_retries: u32,
}
impl Check {
async fn run(&self) -> String {
// perform the check and return the result
tokio::time::sleep(Duration::from_secs(rand::random::<u64>() % 3 + 1)).await;
format!("{} check result", self.name)
}
fn should_run(&self) -> bool {
if self.current_retries >= self.max_retries {
// don't retry the check if it has already exceeded the maximum number of retries
return false;
}
for dep in &self.dependencies {
if let Ok(dep) = dep.lock() {
if !dep.success {
// don't run the check if any of its dependencies have failed
return false;
}
}
}
// use chrono and cron to determine whether the check should run based on its cron schedule
self.cron.upcoming(Utc).next().unwrap_or_else(|| Utc.timestamp(0, 0)) <= Utc::now()
}
async fn run_with_retry(&mut self) {
// run the check with retries if it fails
while self.should_run() {
match self.run().await {
Ok(result) => {
self.success = true;
println!("{} check succeeded: {}", self.name, result);
}
Err(e) => {
self.success = false;
self.current_retries += 1;
println!(
"{} check failed ({:?}). Retrying in {} seconds...",
self.name, e, self.retry_delay
);
tokio::time::sleep(Duration::from_secs(self.retry_delay)).await;
}
}
}
}
}
struct Scheduler {
checks: Vec<Arc<Mutex<Check>>>,
dependencies: HashMap<String, Vec<Arc<Mutex<Check>>>>,
}
impl Scheduler {
fn new(checks: Vec<Check>) -> Result<Scheduler, Box<dyn Error>> {
let checks = checks
.into_iter()
.map(|c| Arc::new(Mutex::new(c)))
.collect();
let dependencies = checks
.iter()
.map(|c| (c.lock().unwrap().name.clone(), vec![]))
.collect();
let mut scheduler = Scheduler {
checks,
dependencies,
};
scheduler.update_dependencies()?;
Ok(scheduler)
}
fn update_dependencies(&mut self) -> Result<(), Box<dyn Error>> {
for check in &self.checks {
let mut check = check.lock().unwrap();
check.dependencies = self
.dependencies
.get(&check.name)
.unwrap_or(&vec![])
.clone();
}
Ok(())
}
async fn run_checks(&mut self) -> Result<(), Box<dyn Error>> {
let mut futures = vec![];
for check in &self.checks {
let check = check.clone();
let fut = async move {
let mut check = check.lock().unwrap();
check.run_with_retry().await;
};
futures.push(fut);
}
join
all(futures).await;
Ok(())
}
fn start(&mut self) -> Result<(), Box<dyn Error>> {
// start the scheduler and run the checks
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(self.run_checks())?;
Ok(())
}
}
fn main() -> Result<(), Box<dyn Error>> {
let checks = vec![
Check {
name: "CheckThingA".into(),
cron: "*/5 * * * * * *".parse()?,
dependencies: vec![],
max_retries: 3,
retry_delay: 1,
current_retries: 0,
success: false,
},
Check {
name: "CheckThingB".into(),
cron: "*/10 * * * * * *".parse()?,
dependencies: vec![Arc::new(Mutex::new(checks[0].clone()))],
max_retries: 2,
retry_delay: 2,
current_retries: 0,
success: false,
},
Check {
name: "CheckThingC".into(),
cron: "*/15 * * * * * *".parse()?,
dependencies: vec![Arc::new(Mutex::new(checks[0].clone())), Arc::new(Mutex::new(checks[1].clone()))],
max_retries: 1,
retry_delay: 3,
current_retries: 0,
success: false,
},
];
let mut scheduler = Scheduler::new(checks)?;
scheduler.start()?;
Ok(())
}
Hey, could you make this into a pull request? This looks like it may work, but easier to review as a PR.
I did some thinking on this, and I have a rough design that might work.
Rust
trait Scheduler {
fn run(&self, checks: Vec<Box<dyn Check>>) -> Vec<(Box<dyn Check>, CheckResult)>;
}
struct ThreadedScheduler {
thread_count: usize,
}
impl Scheduler for ThreadedScheduler {
fn run(&self, checks: Vec<Box<dyn Check>>) -> Vec<(Box<dyn Check>, CheckResult)> {
todo!()
}
}
Python
class BaseScheduler:
def run(self, checks: list[BaseCheck]) -> list[tuple[BaseCheck, CheckResult]]: ...
class ThreadedScheduler(BaseScheduler):
def run(self, checks: list[BaseCheck]) -> list[tuple[BaseCheck, CheckResult]]: ...
This would likely have an async variant with the AsyncScheduler trait/ AsyncBaseScheduler base Python class. The threaded scheduler will be implemented in Rayon (https://crates.io/crates/rayon), and the async scheduler will likely be implemented in Tokio (https://crates.io/crates/tokio). This may be an optionally compiled AsyncTokioScheduler.