javaweb
javaweb copied to clipboard
ThreadPoolExecutor
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
-
corePoolSize: 线程池维护线程的最少数量
-
maximumPoolSize:线程池维护线程的最大数量
-
keepAliveTime: 线程池维护线程所允许的空闲时间
-
unit: 线程池维护线程所允许的空闲时间的单位
-
workQueue: 线程池所使用的缓冲队列
-
handler: 线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
- 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
- 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
- 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
- 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
- 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
handler有四个选择: ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy()
当抛出RejectedExecutionException异常时,会调用rejectedExecution方法 (如果主线程没有关闭,则主线程调用run方法,源码如下
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
)
ThreadPoolExecutor.DiscardOldestPolicy()
抛弃旧的任务
ThreadPoolExecutor.DiscardPolicy()
抛弃当前的任务
下面是一个用线程池做日志记录的例子
/**
* 线程池
*/
private static ThreadPoolExecutor producerPool;
static {
// 构造一个线程池
producerPool = new ThreadPoolExecutor(ConfigUtil.getConfig().getCorePoolSize(), ConfigUtil.getConfig().getMaximumPoolSize(), ConfigUtil.getConfig().getKeepAliveTime(),
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
}
/**
* 写入日志
*
* @param t
* @param <T>
*/
public <T extends BaseLog> void log(T t) {
producerPool.execute(new ServiceLogTask(t));
}
// 异步线程执行写入日志操作
class ServiceLogTask implements Runnable, Serializable {
private BaseLog log;
public ServiceLogTask(BaseLog log) {
if (log != null) {
this.log = log;
} else {
throw new IllegalArgumentException("参数不可为空");
}
}
public void run() {
try {
insertLog();
} catch (Exception e) {
//将失败的数据保存到Log4J,便于之后再处理
log.setDec(e.getMessage());
logger.error(JSONParser.convertObjectToJson(log));
}
}
public void insertLog() {
String switchValue = haveServiceLogSwitch(this.log.getLogCategory());
if (StringUtils.isEmpty(switchValue) || SWITCH_ON.equalsIgnoreCase(switchValue)) {
//修改为客户端直接存入数据
String jsonLogString = JSONParser.convertObjectToJson(this.log);
DBObject dbObject = (DBObject)JSON.parse(jsonLogString);
String type = dbObject.get(LOG_TYPE_FIELD) == null ? "" : dbObject.get(LOG_TYPE_FIELD).toString();
LogType logType = LogType.valueOf(type);
if(logType!=null){
dbObject.removeField(LOG_TYPE_FIELD) ;
MongoFactory.getInstance().getConnection(logType.getCollectionName()).save(dbObject);
}
}
}
public class ConfigUtil {
private static final ConfigUtil config = new ConfigUtil();
/**
* 线程池维护线程的最少数量
*/
private int corePoolSize;
/**
* 线程池维护线程的最大数量
*/
private int maximumPoolSize;
/**
* 线程池维护线程所允许的空闲时间(s)
*/
private long keepAliveTime;
/**
* 数据库对应的名字
*/
private String dbname ;
/**
* mongodb维护的每个主机的最大连接数,Mongo类中已经实现
*/
private int connectionsPerHost;
/**
* mongo维护的可等待的线程倍数
*/
private int threadsAllowedToBlockForConnectionMultiplie;
/**
* 添加支持集群的模式
*/
private ArrayList<ServerAddress> servers;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
private ConfigUtil() {
//初始化配置读取
Properties configMessage = new Properties();
try {
configMessage.load(ConfigUtil.class.getResourceAsStream("/conf/logConfig.properties"));
servers=loadAddress(configMessage);
corePoolSize = getInt(configMessage, "corePoolSize", 5);
maximumPoolSize = getInt(configMessage, "maximumPoolSize", 100);
keepAliveTime = getLong(configMessage, "keepAliveTime", 100);
dbname = getString(configMessage,"mongo.dbname");
connectionsPerHost = getInt(configMessage,"mongo.connectionsPerHost",100);
threadsAllowedToBlockForConnectionMultiplie = getInt(configMessage,"mongo.threadsAllowedToBlockForConnectionMultiplier",20);
username=getString(configMessage,"mongo.username") ;
password=getString(configMessage,"mongo.password");
} catch (Exception e) {
throw new RuntimeException("读取配置文件logConfig.properties错误", e);
}
}
public static ConfigUtil getConfig() {
return config;
}
public int getCorePoolSize() {
return corePoolSize;
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
public long getKeepAliveTime() {
return keepAliveTime;
}
public String getDbname() {
return dbname;
}
private static long getLong(Properties config, String key, long defaultValue) {
String value = config.getProperty(key);
if (StringUtils.isNotEmpty(value)) {
return Long.valueOf(value);
}
return defaultValue;
}
private static int getInt(Properties config, String key, int defaultValue) {
String value = config.getProperty(key);
if (StringUtils.isNotEmpty(value)) {
return Integer.valueOf(value);
}
return defaultValue;
}
private static String getString(Properties config, String key) {
String value = config.getProperty(key);
if (StringUtils.isNotEmpty(value)) {
return value;
}
throw new RuntimeException(key + "值不存在");
}
private static ArrayList<ServerAddress> loadAddress(Properties config){
ArrayList<ServerAddress> servers=new ArrayList<ServerAddress>();
String value = config.getProperty("mongo.host");
if(StringUtils.isNotEmpty(value)){
String[] hosts = value.split(",");
for(int i=0;i<hosts.length;i++){
try{
String[] message = hosts[i].split(":");
String ip = message[0];
int port =Integer.parseInt(message[1]);
ServerAddress serverAddress = new ServerAddress(ip,port);
servers.add(serverAddress);
} catch (Exception e){
e.printStackTrace();
throw new RuntimeException("解析mongodb地址部分失败") ;
}
}
return servers;
}
throw new RuntimeException("mongo.host配置错误,请检查");
}
public int getConnectionsPerHost() {
return connectionsPerHost;
}
public int getThreadsAllowedToBlockForConnectionMultiplie() {
return threadsAllowedToBlockForConnectionMultiplie;
}
public ArrayList<ServerAddress> getServers() {
return servers;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
}
tasks :每秒的任务数,假设为500~1000 taskcost:每个任务花费时间,假设为0.1s responsetime:系统允许容忍的最大响应时间,假设为1s 做几个计算 corePoolSize = 每秒需要多少个线程处理? threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可 queueCapacity = (coreSizePool/taskcost)responsetime 计算可得 queueCapacity = 80/0.11 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。 maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost) 计算可得 maxPoolSize = (1000-80)/10 = 92 (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数 rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理 (https://my.oschina.net/u/169390/blog/97415) keepAliveTime和allowCoreThreadTimeout采用默认通常能满足