blog
blog copied to clipboard
Java 的 Callable 与 Future 使用
trafficstars
Future 表示异步计算的结果,使用 get() 方法可以获得结果,当调用 get() 方法时如果未计算完成则会陷入阻塞状态。下边是用 Future 能实现的一些功能:
- 并行计算
- 任务手动取消
- 任务限时自动取消
- 异构任务并行计算
并行计算
如下,我们要计算 1-3E 之间的累加和:
public class Test {
static class Calc implements Callable<Long> {
private Long start;
private Long end;
Calc(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
public Long call() {
Long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
Calc c1 = new Calc(1L, 100000000L);
Calc c2 = new Calc(100000001L, 200000000L);
Calc c3 = new Calc(200000001L, 300000000L);
Future<Long> r1 = executorService.submit(c1);
Future<Long> r2 = executorService.submit(c2);
Future<Long> r3 = executorService.submit(c3);
System.out.println(r1.get() + r2.get() + r3.get());
executorService.shutdown();
}
}
我们可以把其分为三个过程:1-1e、1e-2e、2e-3e,这样子就可以把计算并行为三个过程。
任务手动取消
这里调用的是 Future.cancel(bool mayInterruptIfRunning) 方法
- mayInterruptIfRunning 为 false:取消已经提交还未被运行的任务
- mayInterruptIfRunning 为 true:取消所有已经提交的任务
使用方法如下:
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
Calc c1 = new Calc(1L, 10000000000L);
Future<Long> r1 = executorService.submit(c1);
Thread.sleep(2000);
r1.cancel(true);
try {
System.out.println(r1.get());
} catch (CancellationException e) {
System.out.println("任务被取消");
} finally {
executorService.shutdown();
}
}
我在用的时候发现了一个线程池泄露的问题,当调用 Future.cancel() 时候,Future 并未被取消掉。这里需要明白一个概念:Java 中没有显示定义线程立即中断或停止的方法,需要手工解决,参见:https://github.com/penglongli/blog/issues/109
这里我们做一下改动:
public class Test {
static class Calc implements Callable<Long> {
private Long start;
private Long end;
Calc(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
public Long call() {
Long sum = 0L;
for (long i = start; i <= end; i++) {
if (Thread.currentThread().isInterrupted()) {
return null;
}
sum += i;
}
return sum;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
Calc c1 = new Calc(1L, 10000000000L);
Future<Long> r1 = executorService.submit(c1);
Thread.sleep(2000);
r1.cancel(true);
try {
System.out.println(r1.get());
} catch (CancellationException e) {
System.out.println("任务被取消");
} catch (ExecutionException e) {
System.out.println("任务执行异常");
} finally {
executorService.shutdown();
}
}
}
任务限时自动取消
任务限时取消的方法是: Future.get(long timeout, TimeUnit unit) ,当然这里也会出现线程池泄露的情况,如下方法:
public class Test3 {
static class Calc implements Callable<Long> {
private Long start;
private Long end;
Calc(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
public Long call() {
Long sum = 0L;
for (long i = start; i <= end; i++) {
if (Thread.currentThread().isInterrupted()) {
// If interrupt, stop thread
return null;
}
}
return sum;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
Calc c1 = new Calc(1L, 10000000000L);
Future<Long> r1 = executorService.submit(c1);
try {
r1.get(2000, TimeUnit.MILLISECONDS);
} catch (CancellationException e) {
System.out.println("Task is canceled");
} catch (ExecutionException e) {
System.out.println("Task occurred an exception when executing");
} catch (TimeoutException e) {
System.out.println("Task timeout");
// do cancel if Timeout
r1.cancel(true);
} finally {
executorService.shutdown();
}
}
}
异构任务并行计算
例如对于 Markdown 解析来说,第一步是解析所有文本,第二步是上传图片。解析文本是 CPU 密集型,上传图片是网络 I/O 密集型。对于此可以把这两步分为两个任务来并行执行