javaweb icon indicating copy to clipboard operation
javaweb copied to clipboard

ThreadPoolExecutor

Open www1350 opened this issue 8 years ago • 0 comments

线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

  • corePoolSize: 线程池维护线程的最少数量

  • maximumPoolSize:线程池维护线程的最大数量

  • keepAliveTime: 线程池维护线程所允许的空闲时间

  • unit: 线程池维护线程所允许的空闲时间的单位

  • workQueue: 线程池所使用的缓冲队列

  • handler: 线程池对拒绝任务的处理策略

    一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。

当一个任务通过execute(Runnable)方法欲添加到线程池时:

  1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
  2. 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
  3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
  4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
  5. 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

handler有四个选择: ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException异常

ThreadPoolExecutor.CallerRunsPolicy()

当抛出RejectedExecutionException异常时,会调用rejectedExecution方法 (如果主线程没有关闭,则主线程调用run方法,源码如下

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
)
ThreadPoolExecutor.DiscardOldestPolicy()

抛弃旧的任务

ThreadPoolExecutor.DiscardPolicy()

抛弃当前的任务

下面是一个用线程池做日志记录的例子

     /**
     * 线程池
     */
    private static ThreadPoolExecutor producerPool;
    static {
        // 构造一个线程池
        producerPool = new ThreadPoolExecutor(ConfigUtil.getConfig().getCorePoolSize(), ConfigUtil.getConfig().getMaximumPoolSize(), ConfigUtil.getConfig().getKeepAliveTime(),
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());
    }

    /**
     * 写入日志
     *
     * @param t
     * @param <T>
     */
    public <T extends BaseLog> void log(T t) {
            producerPool.execute(new ServiceLogTask(t));
    }

    // 异步线程执行写入日志操作
    class ServiceLogTask implements Runnable, Serializable {
        private BaseLog log;
        public ServiceLogTask(BaseLog log) {
            if (log != null) {
                this.log = log;
            } else {
                throw new IllegalArgumentException("参数不可为空");
            }
        }

        public void run() {
            try {
                insertLog();
            } catch (Exception e) {
                //将失败的数据保存到Log4J,便于之后再处理
                log.setDec(e.getMessage());
                logger.error(JSONParser.convertObjectToJson(log));
            }
        }

        public void insertLog() {
            String switchValue = haveServiceLogSwitch(this.log.getLogCategory());
            if (StringUtils.isEmpty(switchValue) || SWITCH_ON.equalsIgnoreCase(switchValue)) {
               //修改为客户端直接存入数据
               String jsonLogString = JSONParser.convertObjectToJson(this.log);
                DBObject dbObject = (DBObject)JSON.parse(jsonLogString);
                String type = dbObject.get(LOG_TYPE_FIELD) == null ? "" : dbObject.get(LOG_TYPE_FIELD).toString();
                LogType logType = LogType.valueOf(type);
                if(logType!=null){
                    dbObject.removeField(LOG_TYPE_FIELD) ;
                    MongoFactory.getInstance().getConnection(logType.getCollectionName()).save(dbObject);
                }

            }
        }



public class ConfigUtil {
    private static final ConfigUtil config = new ConfigUtil();

    /**
     * 线程池维护线程的最少数量
     */
    private int corePoolSize;

    /**
     * 线程池维护线程的最大数量
     */
    private int maximumPoolSize;

    /**
     * 线程池维护线程所允许的空闲时间(s)
     */
    private long keepAliveTime;

    /**
     * 数据库对应的名字
     */
    private String dbname ;

    /**
     * mongodb维护的每个主机的最大连接数,Mongo类中已经实现
     */
    private int connectionsPerHost;

    /**
     * mongo维护的可等待的线程倍数
     */
    private int threadsAllowedToBlockForConnectionMultiplie;


    /**
     * 添加支持集群的模式
     */
    private ArrayList<ServerAddress> servers;

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;
    private ConfigUtil() {
        //初始化配置读取
        Properties configMessage = new Properties();
        try {
            configMessage.load(ConfigUtil.class.getResourceAsStream("/conf/logConfig.properties"));
            servers=loadAddress(configMessage);
            corePoolSize = getInt(configMessage, "corePoolSize", 5);
            maximumPoolSize = getInt(configMessage, "maximumPoolSize", 100);
            keepAliveTime = getLong(configMessage, "keepAliveTime", 100);
            dbname  = getString(configMessage,"mongo.dbname");
            connectionsPerHost = getInt(configMessage,"mongo.connectionsPerHost",100);
            threadsAllowedToBlockForConnectionMultiplie = getInt(configMessage,"mongo.threadsAllowedToBlockForConnectionMultiplier",20);
            username=getString(configMessage,"mongo.username") ;
            password=getString(configMessage,"mongo.password");
        } catch (Exception e) {
            throw new RuntimeException("读取配置文件logConfig.properties错误", e);
        }
    }

    public static ConfigUtil getConfig() {
        return config;
    }

    public int getCorePoolSize() {
        return corePoolSize;
    }

    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    public long getKeepAliveTime() {
        return keepAliveTime;
    }

    public String getDbname() {
        return dbname;
    }
    private static long getLong(Properties config, String key, long defaultValue) {
        String value = config.getProperty(key);
        if (StringUtils.isNotEmpty(value)) {
            return Long.valueOf(value);
        }
        return defaultValue;
    }

    private static int getInt(Properties config, String key, int defaultValue) {
        String value = config.getProperty(key);
        if (StringUtils.isNotEmpty(value)) {
            return Integer.valueOf(value);
        }
        return defaultValue;
    }

    private static String getString(Properties config, String key) {
        String value = config.getProperty(key);
        if (StringUtils.isNotEmpty(value)) {
            return value;
        }
        throw new RuntimeException(key + "值不存在");
    }
    private static ArrayList<ServerAddress> loadAddress(Properties config){
        ArrayList<ServerAddress> servers=new ArrayList<ServerAddress>();
        String value = config.getProperty("mongo.host");
        if(StringUtils.isNotEmpty(value)){
          String[] hosts = value.split(",");
          for(int i=0;i<hosts.length;i++){
              try{
                  String[] message = hosts[i].split(":");
                  String ip = message[0];
                  int port =Integer.parseInt(message[1]);
                  ServerAddress serverAddress = new ServerAddress(ip,port);
                  servers.add(serverAddress);
              } catch (Exception e){
                  e.printStackTrace();
                  throw new RuntimeException("解析mongodb地址部分失败")  ;
              }
          }
          return servers;
        }
        throw  new RuntimeException("mongo.host配置错误,请检查");
    }

    public int getConnectionsPerHost() {
        return connectionsPerHost;
    }

    public int getThreadsAllowedToBlockForConnectionMultiplie() {
        return threadsAllowedToBlockForConnectionMultiplie;
    }

    public ArrayList<ServerAddress> getServers() {
        return servers;
    }

    public String getUsername() {
        return username;
    }

    public String getPassword() {
        return password;
    }
}

tasks :每秒的任务数,假设为500~1000 taskcost:每个任务花费时间,假设为0.1s responsetime:系统允许容忍的最大响应时间,假设为1s 做几个计算 corePoolSize = 每秒需要多少个线程处理? threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可 queueCapacity = (coreSizePool/taskcost)responsetime 计算可得 queueCapacity = 80/0.11 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。 maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost) 计算可得 maxPoolSize = (1000-80)/10 = 92 (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数 rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理 (https://my.oschina.net/u/169390/blog/97415) keepAliveTime和allowCoreThreadTimeout采用默认通常能满足

www1350 avatar Apr 13 '16 13:04 www1350