checks icon indicating copy to clipboard operation
checks copied to clipboard

Add a job scheduler.

Open scott-wilson opened this issue 3 years ago • 4 comments

The scheduler is responsible for running the checks and getting the results.

A simple scheduler would look something like this in Python:

my_checks = [
    CheckThingA(),
    CheckThingB(),
    CheckThingC(),
]

for check in my_checks:
    result = pychecks.run(check)

But, we might be able to run the checks in multiple threads/async tasks/processes depending on the context. Or, we may have a DAG scheduler so a job can run if its parents pass.

scott-wilson avatar Dec 30 '22 04:12 scott-wilson

import asyncio
import croniter
import datetime
import logging
import random
import time

class Check:
    def __init__(self, name, cron, dependencies=None, max_retries=0, retry_delay=60):
        """
        A class representing a single check to be performed.
        
        Args:
            name (str): The name of the check.
            cron (str): A string in cron format that determines when the check should be run.
            dependencies (list of Check, optional): Other checks that must succeed before this check can run.
            max_retries (int, optional): The maximum number of times to retry the check if it fails.
            retry_delay (int, optional): The number of seconds to wait between retries.
        """
        self.name = name
        self.cron = cron
        self.dependencies = dependencies or []
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.current_retries = 0
    
    async def run(self):
        """
        Perform the check and return the result.
        """
        # simulate a delay to make the check take some time
        await asyncio.sleep(random.randint(1, 3))
        return f"{self.name} check result"
    
    def should_run(self):
        """
        Determine whether the check should be run based on the current time and its dependencies.
        """
        if self.current_retries >= self.max_retries:
            # don't retry the check if it has already exceeded the maximum number of retries
            return False
        for dep in self.dependencies:
            if not dep.success:
                # don't run the check if any of its dependencies have failed
                return False
        # use croniter to determine whether the check should run based on its cron schedule
        return croniter.croniter(self.cron, datetime.datetime.now()).get_next(datetime.datetime) <= datetime.datetime.now()
    
    async def run_with_retry(self):
        """
        Run the check with retries if it fails.
        """
        while self.should_run():
            try:
                result = await self.run()
                self.success = True
                logging.info(f"{self.name} check succeeded: {result}")
            except Exception as e:
                self.success = False
                self.current_retries += 1
                logging.error(f"{self.name} check failed ({e}). Retrying in {self.retry_delay} seconds...")
                await asyncio.sleep(self.retry_delay)

class Scheduler:
    def __init__(self, checks):
        """
        A class representing a scheduler for running checks.
        
        Args:
            checks (list of Check): The list of checks to be run.
        """
        self.checks = checks
        # create a dictionary of dependencies between checks
        self.dependencies = {c.name: c.dependencies for c in checks}
    
    async def run_checks(self):
        """
        Run the checks in parallel using asyncio.
        """
        # create a dictionary of futures for each check
        futures = {c.name: asyncio.ensure_future(c.run_with_retry()) for c in self.checks}
        while futures:
            # wait for the first future to complete
            done, _ = await asyncio.wait(futures.values(), return_when=asyncio.FIRST_COMPLETED)
            for fut in done:
                # remove the completed future from the dictionary
                del futures[fut.result().name]
                # update the dependencies of other checks that depend on this one
                for c in self.checks:
                    if fut.result().name in self.dependencies[c.name]:
                        c.dependencies.remove(fut.result())
    def start(self):
        """
         Set up logging and start the event loop to run the checks.
        """
        # configure logging
        logging.basicConfig(level=logging.INFO)
        # create a new event loop
        loop = asyncio.get_event_loop()
        try:
            # run the checks using the event loop
            loop.run_until_complete(self.run_checks())
        finally:
            # close the event loop when finished
            loop.close()

QuincyForbes avatar Mar 06 '23 20:03 QuincyForbes

use chrono::{DateTime, Utc};
use cron::Schedule;
use futures::future::join_all;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

struct Check {
    name: String,
    cron: Schedule,
    dependencies: Vec<Arc<Mutex<Check>>>,
    max_retries: u32,
    retry_delay: u64,
    current_retries: u32,
}

impl Check {
    async fn run(&self) -> String {
        // perform the check and return the result
        tokio::time::sleep(Duration::from_secs(rand::random::<u64>() % 3 + 1)).await;
        format!("{} check result", self.name)
    }

    fn should_run(&self) -> bool {
        if self.current_retries >= self.max_retries {
            // don't retry the check if it has already exceeded the maximum number of retries
            return false;
        }
        for dep in &self.dependencies {
            if let Ok(dep) = dep.lock() {
                if !dep.success {
                    // don't run the check if any of its dependencies have failed
                    return false;
                }
            }
        }
        // use chrono and cron to determine whether the check should run based on its cron schedule
        self.cron.upcoming(Utc).next().unwrap_or_else(|| Utc.timestamp(0, 0)) <= Utc::now()
    }

    async fn run_with_retry(&mut self) {
        // run the check with retries if it fails
        while self.should_run() {
            match self.run().await {
                Ok(result) => {
                    self.success = true;
                    println!("{} check succeeded: {}", self.name, result);
                }
                Err(e) => {
                    self.success = false;
                    self.current_retries += 1;
                    println!(
                        "{} check failed ({:?}). Retrying in {} seconds...",
                        self.name, e, self.retry_delay
                    );
                    tokio::time::sleep(Duration::from_secs(self.retry_delay)).await;
                }
            }
        }
    }
}

struct Scheduler {
    checks: Vec<Arc<Mutex<Check>>>,
    dependencies: HashMap<String, Vec<Arc<Mutex<Check>>>>,
}

impl Scheduler {
    fn new(checks: Vec<Check>) -> Result<Scheduler, Box<dyn Error>> {
        let checks = checks
            .into_iter()
            .map(|c| Arc::new(Mutex::new(c)))
            .collect();
        let dependencies = checks
            .iter()
            .map(|c| (c.lock().unwrap().name.clone(), vec![]))
            .collect();
        let mut scheduler = Scheduler {
            checks,
            dependencies,
        };
        scheduler.update_dependencies()?;
        Ok(scheduler)
    }

    fn update_dependencies(&mut self) -> Result<(), Box<dyn Error>> {
        for check in &self.checks {
            let mut check = check.lock().unwrap();
            check.dependencies = self
                .dependencies
                .get(&check.name)
                .unwrap_or(&vec![])
                .clone();
        }
        Ok(())
    }

    async fn run_checks(&mut self) -> Result<(), Box<dyn Error>> {
        let mut futures = vec![];
        for check in &self.checks {
            let check = check.clone();
            let fut = async move {
                let mut check = check.lock().unwrap();
                check.run_with_retry().await;
            };
            futures.push(fut);
        }
        join
        all(futures).await;
        Ok(())
    }

    fn start(&mut self) -> Result<(), Box<dyn Error>> {
        // start the scheduler and run the checks
        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()?;
        rt.block_on(self.run_checks())?;
        Ok(())
    }
}

fn main() -> Result<(), Box<dyn Error>> {
    let checks = vec![
        Check {
            name: "CheckThingA".into(),
            cron: "*/5 * * * * * *".parse()?,
            dependencies: vec![],
            max_retries: 3,
            retry_delay: 1,
            current_retries: 0,
            success: false,
        },
        Check {
            name: "CheckThingB".into(),
            cron: "*/10 * * * * * *".parse()?,
            dependencies: vec![Arc::new(Mutex::new(checks[0].clone()))],
            max_retries: 2,
            retry_delay: 2,
            current_retries: 0,
            success: false,
        },
        Check {
            name: "CheckThingC".into(),
            cron: "*/15 * * * * * *".parse()?,
            dependencies: vec![Arc::new(Mutex::new(checks[0].clone())), Arc::new(Mutex::new(checks[1].clone()))],
            max_retries: 1,
            retry_delay: 3,
            current_retries: 0,
            success: false,
        },
    ];
    let mut scheduler = Scheduler::new(checks)?;
    scheduler.start()?;
    Ok(())
}

QuincyForbes avatar Mar 06 '23 20:03 QuincyForbes

Hey, could you make this into a pull request? This looks like it may work, but easier to review as a PR.

scott-wilson avatar Mar 06 '23 21:03 scott-wilson

I did some thinking on this, and I have a rough design that might work.

Rust

trait Scheduler {
    fn run(&self, checks: Vec<Box<dyn Check>>) -> Vec<(Box<dyn Check>, CheckResult)>;
}

struct ThreadedScheduler {
    thread_count: usize,
}

impl Scheduler for ThreadedScheduler {
    fn run(&self, checks: Vec<Box<dyn Check>>) -> Vec<(Box<dyn Check>, CheckResult)> {
        todo!()
    }
}

Python

class BaseScheduler:
    def run(self, checks: list[BaseCheck]) -> list[tuple[BaseCheck, CheckResult]]: ...


class ThreadedScheduler(BaseScheduler):
    def run(self, checks: list[BaseCheck]) -> list[tuple[BaseCheck, CheckResult]]: ...

This would likely have an async variant with the AsyncScheduler trait/ AsyncBaseScheduler base Python class. The threaded scheduler will be implemented in Rayon (https://crates.io/crates/rayon), and the async scheduler will likely be implemented in Tokio (https://crates.io/crates/tokio). This may be an optionally compiled AsyncTokioScheduler.

scott-wilson avatar Dec 04 '24 05:12 scott-wilson