Future、Callable
Runnable 缺陷
- 不能返回一个返回值
- run() 方法中不能抛出 checked Exception (方法签名),只能在 run() 方法中使用 try/catch 进行异常捕获。
public abstract void run();
Callable 接口
- 类似于 Runnable,被其他线程执行的任务
- 实现 call 方法
- 有返回值
Future
Future 的核心思想是:一个方法的计算过程可能非常耗时,一直在原地等待方法返回,显然不明智。可以把该计算过程放到线程池去执行,并通过 Future 去控制方法的计算过程,在计算出结果后直接获取该结果。
Callable 和 Future 的关系
- Future.get() 来获取 Callable 接口返回的执行结果,Future.isDone()来判断任务是否已经执行完了,以及取消这个任务,限时获取任务的结果等。
- 在 call() 未执行完毕之前,调用 get() 的线程(假设是主线程)会被阻塞,直到 call() 方法返回了结果后,此时 future.get() 才会得到该结果,然后主线程 CIA 会切换到 runnable 状态。
- Future 是一个存储器,他存储了 call() 这个任务的结果,而这个任务的执行时间是无法提前确定的,因为这完全取决于 call() 方法执行的情况
Future 的常用方法
- get() 方法的行为取决于 Callable 任务的状态,只有以下 5 种情况:
1. 任务正常完成:get() 方法会立刻返回结果
2. 任务尚未完成(任务还没开始或进行中):get() 将阻塞并直到任务完成。
3. 任务执行过程中抛出 Exception:get() 方法会抛出 `ExecutionException` (这里抛出的`ExecutionException`是 call() 执行时产生的异常,不论 call() 执行时抛出异常的类型是什么,最后get() 方法抛出的异常类型都是 `ExecutionException`)
4. 任务被取消: get() 方法会抛出 `CancellationException`
5. 任务超时: get() 方法有一个重载方法, 是传入一个延迟时间的,如果时间到了还没有获得结果, get() 方法就会抛出 TimeoutException。
- get(long timeout, TimeUnit unit):有超时的获取
* 超时的需要很常见
* 用 get(long timeout,TimeUnit unit) 方法时,如果 call() 在规定时间内完成了任务,那么就会正常获取到返回值;而如果在指定时间内没有计算出结果,那么就会抛出 TimeoutException。
* 超时不获取,任务需要取消(已经不需要了, 因为超时了,取消吧!)
- isDone() 方法:判断线程是否执行完毕
- isCancelled() 方法:判断是否被取消
- cancel():取消任务
* 取消任务的执行
ThreadPool 的 submit 方法返回 Future 对象
首先,我们要给线程池提交我们的任务,提交时线程池会立刻返回给我们一个空的 Future 容器。当线程的任务一旦执行完毕,也就是当我们可以获取结果的时候,线程池便会把该结果填入到空的 Future 容器中,此时便可以从 Future 中获得任务的执行结果。
Future 正常的使用方式
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 描述: 演示一个Future的使用方法
*/
public class OneFuture {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
Future<Integer> future = service.submit(new CallableTask());
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
service.shutdown();
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return new Random().nextInt(); //2129913364
}
}
}
Future 的 Lambda 形式
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 描述: 演示一个Future的使用方法,lambda形式
*/
public class OneFutureLambda {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
//Lambda 形式
Callable<Integer> callable = () -> {
Thread.sleep(3000);
return new Random().nextInt(); //2129913364
};
Future<Integer> future = service.submit(callable);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
service.shutdown();
}
}
多个任务:使用 Futrue 数组来获取结果
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 描述: 演示批量提交任务时,用List来批量接收多个任务的结果
*/
public class MultiFutures {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(20);
ArrayList<Future> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
//提交任务
Future<Integer> future = service.submit(new CallableTask());
futures.add(future);
}
for (int i = 0; i < 20; i++) {
Future<Integer> future = futures.get(i);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
service.shutdown();
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return new Random().nextInt();
}
}
}
2010666636
-788311394
2011169962
84341953
-1682064941
1859651109
765166231
739918142
547060157
-1931733995
1493835634
-2114927045
2035652866
-323151289
1598374730
2041121102
-385438633
750789799
728797692
-1753371291
get() 方法过程中抛出异常:
演示 get 方法过程中抛出异常,for 循环为了演示抛出 Exception 的时机:并不是说一产生异常就抛出,直到我们 get 执行时,才会抛出。
/**
* 描述: 演示get方法过程中抛出异常,for循环为了演示抛出Exception的时机:并不是说一产生异常就抛出,直到我们get执行时,才会抛出。
*/
public class GetException {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(20);
Future<Integer> future = service.submit(new CallableTask());
try {
for (int i = 0; i < 5; i++) {
System.out.println(i);
Thread.sleep(500);
}
System.out.println(future.isDone()); //true 线程执行完毕( 这里只关系线程是否运行结束,并不关心是否执行成功,抛出异常等… 只要运行结束就是 true)
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("InterruptedException异常");
} catch (ExecutionException e) { //在这里捕获到
e.printStackTrace();
System.out.println("ExecutionException异常");
}
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
throw new IllegalArgumentException("Callable抛出异常");
}
}
}
0
1
2
3
4
true
ExecutionException异常
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable抛出异常
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at test.future.GetException.main(GetException.java:26)
cancel():演示 cancel 传入 true 和 false 的区别,代表是否中断正在执行的任务。
当被中断的任务有能力处理 interrupt 信号的时候可以传入 true。
当被中断的任务没有能力处理 interrupt 信号的时候(或者我也不清楚被中断的任务有没有这个能力),则可以传入 false。 或者想要停止那些还没有开始运行的任务,也可以传入 false。
* 如果这个任务还没有开始执行,任务会被正常取消,未来也不会指定,方法返回 true。
* 如果任务已完成,或者已经取消:那么 cancel() 方法会执行失败,返回 false.
* 如果这个任务已经开始执行了,那么这个取消方法不会直接取消任务,而是会根据我们填的参数[true | false] 做判断: true 就发中断信号,flase 就不发中断信号(false 的最大意义在于阻止那些还未开始运行的任务,对于已经运行的任务则无效)
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 描述: 演示get的超时方法,需要注意超时后处理,调用future.cancel()。演示cancel传入true和false的区别,代表是否中断正在执行的任务。
*/
public class Timeout {
private static final Ad DEFAULT_AD = new Ad("无网络时候的默认广告");
private static final ExecutorService exec = Executors.newFixedThreadPool(10);
static class Ad {
String name;
public Ad(String name) {
this.name = name;
}
@Override
public String toString() {
return "Ad{" +
"name='" + name + '\'' +
'}';
}
}
static class FetchAdTask implements Callable<Ad> {
@Override
public Ad call() throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("sleep期间被中断了");
return new Ad("被中断时候的默认广告");//这里不生效,超时之后就已经没有意义了
}
return new Ad("旅游订票哪家强?找某程");
}
}
public void printAd() {
Future<Ad> f = exec.submit(new FetchAdTask());
Ad ad;
try {
ad = f.get(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
ad = new Ad("被中断时候的默认广告");
} catch (ExecutionException e) {
ad = new Ad("异常时候的默认广告");
} catch (TimeoutException e) { //超时之后应该主动取消线程执行
ad = new Ad("超时时候的默认广告"); //兜底数据
System.out.println("超时,未获取到广告");
boolean cancel = f.cancel(true);//true 对任务线程发起中断信号 false 则代表不会发起中断信号
System.out.println("cancel的结果:" + cancel);
}
exec.shutdown();
System.out.println(ad);
}
public static void main(String[] args) {
Timeout timeout = new Timeout();
timeout.printAd();
}
}
超时,未获取到广告
cancel的结果:true
sleep期间被中断了
Ad{name='超时时候的默认广告'}
FutureTask 当作 Future 使用
- FutureTask 来获取 Future 和 任务的结果。
- FutureTask 是一种包装器,可以把 Callable 转换成 Futrue 和 Runnable,它同时实现二者的接口。FutureTask 既是 Runnable 又是 Futrue。
- 所以它既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。
FutrueTask 用法
把 Callable 实例当作参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,用线程池或另起线程去执行这个对象,最后通过 FutureTask 获取刚才执行的结果。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
/**
* 描述: 演示FutureTask的用法
*/
public class FutureTaskDemo {
public static void main(String[] args) {
//构建 Callable 实例
Task task = new Task();
//构建 FutureTask 实例
FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
//执行 FutureTask 实例
//new Thread(integerFutureTask).start(); //线程
ExecutorService service = Executors.newCachedThreadPool(); //线程池
service.submit(integerFutureTask);
try {
//获取 FutureTask 运行结果
System.out.println("task运行结果:"+integerFutureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程正在计算");
Thread.sleep(3000);
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}
子线程正在计算
task运行结果:4950
Future 注意点
- 当 for 循环批量获取 future 的结果时,容易发生一部分线程很慢的情况,get() 方法调用时应使用 timeout 限制。(多个任务,任务的内容不同,耗时也不同),会阻塞在最长的任务(等待)。
- Future 的生命周期不能后退。(一旦完成任务,它永久停留在“已完成”的状态,不能重头再来)