hippo4j icon indicating copy to clipboard operation
hippo4j copied to clipboard

[Feature] Add dynamic thread pool task metrics: minimum execution, maximum execution and average execution time

Open magestacks opened this issue 2 years ago • 9 comments

magestacks avatar Sep 14 '22 00:09 magestacks

我想试试看,不过我得先确认一下我对这个需求的理解有没有问题。

从简单的描述来看的话,作者大大是不是想要扩展类似这样的功能:

即针对每个被管理的线程池,需要能够感知并且管理每个被投递的任务的生命周期,包括且不限于:

  • 任务被处理的时间节点,可能包括:
    1. 被投递到线程池的时间节点;
    2. 被执行的时间节点;
    3. 执行完毕的时间节点;
  • 感知线程池中任务的状态,可能包括:
    1. 等待中:已被投递,处于等待队列;
    2. 执行中:被线程池消费,但是没有执行完毕;
    3. 执行完毕:执行完毕;
    4. 执行失败:执行过程中发生异常;

然后根据这些信息,就可以得到一些比如任务执行时间,失败任务数之类的综合性指标,不知道我理解的有没有问题。

Createsequence avatar Oct 20 '22 08:10 Createsequence

没有这么复杂,就是想通过线程池运行过的任务,得出三个指标:最小执行、最大执行和平均执行时间。

比如:

执行两个任务,一个执行时间 1s,另一个 2s。那么,对应的指标应该是:

  • 最小执行:1s
  • 最大执行:2s
  • 平均执行:1.5s

这一块我还没有想好要如何实现。如果实现比较耗费资源,那么这个需求就是不合理的。

magestacks avatar Oct 20 '22 14:10 magestacks

没有这么复杂,就是想通过线程池运行过的任务,得出三个指标:最小执行、最大执行和平均执行时间。

比如:

执行两个任务,一个执行时间 1s,另一个 2s。那么,对应的指标应该是:

  • 最小执行:1s
  • 最大执行:2s
  • 平均执行:1.5s

这一块我还没有想好要如何实现。如果实现比较耗费资源,那么这个需求就是不合理的。

这一块实现的话我大概有一点思路,完整版的功能或许可以通过三个组件完成:

  • 监控器:每个线程池实例绑定一个监视器,用于将普通任务封装为监控任务,并在内部维护这个线程池管理的所有监控任务的信息;

  • 任务包装器:将普通的 Runable 任务包装为 MonitoredTask,类似现有的 TaskDecorator

  • 监控任务:即普通 Runable 的封装,持有监控器引用,会在调用 run 方法的前后触发一些钩子方法,将信息传递给监视器,比如:

    public void run() {
        monitor.executeTask(); // 任务触发回调,可以记录任务触发时间
        try {
            runalble.run();
            monitor.completeTask(); // 任务完成回调,记录任务完成时间
        } catch(Exception e) {
            monitor.interruptTask(); // 任务失败回调,记录任务失败时间
            throw e;
        }
    }
    

    监控器将会在各个钩子方法调用时记录时间戳或者做一些其他的事情,这些信息可以存在内存,或者直接发往各类消费者,以便后期用于其他扩展或者进行数据分析,比如每隔一段时间将已完成的任务信息提取出来,计算线程池的各项指标,包括且不限于各种时间;

以上是个人的一点的思路,本质上是通过给任务实例加上一层代理,通过在各个阶段的植入钩子方法来收集信息。

实际上基于这个做法,后期还可以扩展一些失败重试之类的机制,增加一个钩子或许会在出乎意料的地方派上用场。

Createsequence avatar Oct 20 '22 15:10 Createsequence

听了上述的实现方案,更像是统计 metric,为了不引入复杂性,我理解只需要三个值即可。即在线程池实例中查看当前线程池运行参数,会出现这三个参数

magestacks avatar Oct 20 '22 15:10 magestacks

ok,那我先试试

Createsequence avatar Oct 20 '22 15:10 Createsequence

小马哥,我在读源码的时候,发现 DynamicThreadPoolExecutor 通过 ThreadPoolExecutor 提供的 executebeforeExecuteafterExecute 方法已经提供好了回调的钩子了,而也已经有关于开始时间和结束时间的处理逻辑了,如果要加上这个这三个时间指标的需求,直接在 DynamicThreadPoolExecutor 类里加上刷新时间的逻辑就可以。

不过,个人觉得直接往上加代码或许不是一个优雅的解决方案,我注意到目前 DynamicThreadPoolExecutor 中已经写入了多种固定的回调逻辑:

  • AbstractDynamicExecutorSupport 父类提供的线程池中断时等待执行剩余任务的逻辑;
  • RejectedProxyInvocationHandler 提供的当触发拒绝策略时,通过 ThreadPoolNotifyAlarmHandler 发送告警的逻辑;
  • RejectedProxyInvocationHandler 提供的记录拒绝任务数的逻辑;
  • TaskDecorator 提供的在执行任务前对 Runable 进行包装的逻辑;
  • ThreadPoolNotifyAlarmHandler 提供的在任务执行后检测任务执行是否超时,并发生告警的逻辑;

这些操作被直接固定在 DynamicThreadPoolExecutor,当需要进行扩展的时候——比如现在——往往需要直接修改 DynamicThreadPoolExecutor 类本身。

出于更好的管理回调方法,与对 DynamicThreadPoolExecutor 本身避免耦合过多功能的方面考虑,是否通过提供类似 SpringAware 接口,或是 PostProcessor,来统一的向 DynamicThreadPoolExecutor 各个执行阶段设置回调方法呢?

比如,我们可以在 DynamicThreadPoolExecutor 中设置这样几个回调节点,它们分别对应三个 Aware 接口里面的五个方法:

  • ExecuteAware:提供三个方法,可以在任务执行前、执行时、执行后触发回调;
  • RejectedAware:提供一个方法,在拒绝策略执行前回调;
  • ShutdownAware:提供一个方法,在线程池停止前回调;

基于该想法,DynamicThreadPoolExecutor 本身可以简化为一个具备注册和管理 Aware 接口的纯净类,而各项扩展功能都可以以 XXXAwareHandler 的方式挂载到这个核心类上。

比如:

  • 执行前包装任务的逻辑可以分离为 TaskDecoratorExecuteAwareHandler

  • 任务拒绝时告警的逻辑可以分离为 NotifyAlarmRejectedAwareHandler

  • 关闭线程池前等待执行剩余任务的逻辑可以分离为 TerminationShutdownAwareHandler

  • 记录线程执行时间的逻辑可以分离为 TimeWatchExecuteAwareHandler

    而根据执行时间选择是否发生超时告警的逻辑则可以基于前者扩展为 NotifiableTimeWatchExecuteAwareHandler

若有必要,用户也可以根据需要比较方便的自行挂载新的 Aware 处理器。

Createsequence avatar Oct 21 '22 08:10 Createsequence

我觉得这是个很好的设计,你能额外创建一个重构 Issue,并发起 PR 么?

magestacks avatar Oct 21 '22 08:10 magestacks

ok,待会我发个 Issues 吧,不过我可能需要比较多的时间来完成 :)

Createsequence avatar Oct 21 '22 08:10 Createsequence

可以的,这个具体看你的时间。

magestacks avatar Oct 21 '22 08:10 magestacks