Future、Callable
Runnable 缺陷
- 不能返回一个返回值
- run() 方法中不能抛出 checked Exception (方法签名),只能在 run() 方法中使用 try/catch 进行异常捕获。
1public abstract void run();
Callable 接口
- 类似于 Runnable,被其他线程执行的任务
- 实现 call 方法
- 有返回值
Future
Future 的核心思想是:一个方法的计算过程可能非常耗时,一直在原地等待方法返回,显然不明智。可以把该计算过程放到线程池去执行,并通过 Future 去控制方法的计算过程,在计算出结果后直接获取该结果。
Callable 和 Future 的关系
- Future.get() 来获取 Callable 接口返回的执行结果,Future.isDone()来判断任务是否已经执行完了,以及取消这个任务,限时获取任务的结果等。
- 在 call() 未执行完毕之前,调用 get() 的线程(假设是主线程)会被阻塞,直到 call() 方法返回了结果后,此时 future.get() 才会得到该结果,然后主线程 CIA 会切换到 runnable 状态。
- Future 是一个存储器,他存储了 call() 这个任务的结果,而这个任务的执行时间是无法提前确定的,因为这完全取决于 call() 方法执行的情况
Future 的常用方法
- get() 方法的行为取决于 Callable 任务的状态,只有以下 5 种情况:
11. 任务正常完成:get() 方法会立刻返回结果
22. 任务尚未完成(任务还没开始或进行中):get() 将阻塞并直到任务完成。
33. 任务执行过程中抛出 Exception:get() 方法会抛出 `ExecutionException` (这里抛出的`ExecutionException`是 call() 执行时产生的异常,不论 call() 执行时抛出异常的类型是什么,最后get() 方法抛出的异常类型都是 `ExecutionException`)
44. 任务被取消: get() 方法会抛出 `CancellationException`
55. 任务超时: get() 方法有一个重载方法, 是传入一个延迟时间的,如果时间到了还没有获得结果, get() 方法就会抛出 TimeoutException。
- get(long timeout, TimeUnit unit):有超时的获取
1* 超时的需要很常见
2* 用 get(long timeout,TimeUnit unit) 方法时,如果 call() 在规定时间内完成了任务,那么就会正常获取到返回值;而如果在指定时间内没有计算出结果,那么就会抛出 TimeoutException。
3* 超时不获取,任务需要取消(已经不需要了, 因为超时了,取消吧!)
- isDone() 方法:判断线程是否执行完毕
- isCancelled() 方法:判断是否被取消
- cancel():取消任务
1* 取消任务的执行
ThreadPool 的 submit 方法返回 Future 对象
首先,我们要给线程池提交我们的任务,提交时线程池会立刻返回给我们一个空的 Future 容器。当线程的任务一旦执行完毕,也就是当我们可以获取结果的时候,线程池便会把该结果填入到空的 Future 容器中,此时便可以从 Future 中获得任务的执行结果。
Future 正常的使用方式
1import java.util.Random;
2import java.util.concurrent.Callable;
3import java.util.concurrent.ExecutionException;
4import java.util.concurrent.ExecutorService;
5import java.util.concurrent.Executors;
6import java.util.concurrent.Future;
7
8/**
9 * 描述: 演示一个Future的使用方法
10 */
11public class OneFuture {
12
13 public static void main(String[] args) {
14 ExecutorService service = Executors.newFixedThreadPool(10);
15 Future<Integer> future = service.submit(new CallableTask());
16 try {
17 System.out.println(future.get());
18 } catch (InterruptedException e) {
19 e.printStackTrace();
20 } catch (ExecutionException e) {
21 e.printStackTrace();
22 }
23 service.shutdown();
24 }
25
26 static class CallableTask implements Callable<Integer> {
27
28 @Override
29 public Integer call() throws Exception {
30 Thread.sleep(3000);
31 return new Random().nextInt(); //2129913364
32 }
33 }
34
35}
Future 的 Lambda 形式
1import java.util.Random;
2import java.util.concurrent.Callable;
3import java.util.concurrent.ExecutionException;
4import java.util.concurrent.ExecutorService;
5import java.util.concurrent.Executors;
6import java.util.concurrent.Future;
7
8/**
9 * 描述: 演示一个Future的使用方法,lambda形式
10 */
11public class OneFutureLambda {
12
13 public static void main(String[] args) {
14 ExecutorService service = Executors.newFixedThreadPool(10);
15
16 //Lambda 形式
17 Callable<Integer> callable = () -> {
18 Thread.sleep(3000);
19 return new Random().nextInt(); //2129913364
20 };
21
22 Future<Integer> future = service.submit(callable);
23 try {
24 System.out.println(future.get());
25 } catch (InterruptedException e) {
26 e.printStackTrace();
27 } catch (ExecutionException e) {
28 e.printStackTrace();
29 }
30 service.shutdown();
31 }
32
33}
多个任务:使用 Futrue 数组来获取结果
1import java.util.ArrayList;
2import java.util.Random;
3import java.util.concurrent.Callable;
4import java.util.concurrent.ExecutionException;
5import java.util.concurrent.ExecutorService;
6import java.util.concurrent.Executors;
7import java.util.concurrent.Future;
8
9/**
10 * 描述: 演示批量提交任务时,用List来批量接收多个任务的结果
11 */
12public class MultiFutures {
13
14 public static void main(String[] args) throws InterruptedException {
15 ExecutorService service = Executors.newFixedThreadPool(20);
16
17 ArrayList<Future> futures = new ArrayList<>();
18 for (int i = 0; i < 20; i++) {
19 //提交任务
20 Future<Integer> future = service.submit(new CallableTask());
21 futures.add(future);
22 }
23
24
25 for (int i = 0; i < 20; i++) {
26 Future<Integer> future = futures.get(i);
27 try {
28 System.out.println(future.get());
29 } catch (InterruptedException e) {
30 e.printStackTrace();
31 } catch (ExecutionException e) {
32 e.printStackTrace();
33 }
34 }
35
36 service.shutdown();
37
38 }
39
40 static class CallableTask implements Callable<Integer> {
41
42 @Override
43 public Integer call() throws Exception {
44 Thread.sleep(3000);
45 return new Random().nextInt();
46 }
47 }
48}
12010666636
2-788311394
32011169962
484341953
5-1682064941
61859651109
7765166231
8739918142
9547060157
10-1931733995
111493835634
12-2114927045
132035652866
14-323151289
151598374730
162041121102
17-385438633
18750789799
19728797692
20-1753371291
get() 方法过程中抛出异常:
演示 get 方法过程中抛出异常,for 循环为了演示抛出 Exception 的时机:并不是说一产生异常就抛出,直到我们 get 执行时,才会抛出。
1/**
2 * 描述: 演示get方法过程中抛出异常,for循环为了演示抛出Exception的时机:并不是说一产生异常就抛出,直到我们get执行时,才会抛出。
3 */
4public class GetException {
5
6 public static void main(String[] args) {
7 ExecutorService service = Executors.newFixedThreadPool(20);
8 Future<Integer> future = service.submit(new CallableTask());
9
10
11 try {
12 for (int i = 0; i < 5; i++) {
13 System.out.println(i);
14 Thread.sleep(500);
15 }
16 System.out.println(future.isDone()); //true 线程执行完毕( 这里只关系线程是否运行结束,并不关心是否执行成功,抛出异常等… 只要运行结束就是 true)
17 future.get();
18 } catch (InterruptedException e) {
19 e.printStackTrace();
20 System.out.println("InterruptedException异常");
21 } catch (ExecutionException e) { //在这里捕获到
22 e.printStackTrace();
23 System.out.println("ExecutionException异常");
24 }
25 }
26
27 static class CallableTask implements Callable<Integer> {
28
29 @Override
30 public Integer call() throws Exception {
31 throw new IllegalArgumentException("Callable抛出异常");
32 }
33 }
34}
10
21
32
43
54
6true
7ExecutionException异常
8java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable抛出异常
9 at java.util.concurrent.FutureTask.report(FutureTask.java:122)
10 at java.util.concurrent.FutureTask.get(FutureTask.java:192)
11 at test.future.GetException.main(GetException.java:26)
cancel():演示 cancel 传入 true 和 false 的区别,代表是否中断正在执行的任务。
当被中断的任务有能力处理 interrupt 信号的时候可以传入 true。
当被中断的任务没有能力处理 interrupt 信号的时候(或者我也不清楚被中断的任务有没有这个能力),则可以传入 false。 或者想要停止那些还没有开始运行的任务,也可以传入 false。
1* 如果这个任务还没有开始执行,任务会被正常取消,未来也不会指定,方法返回 true。
2* 如果任务已完成,或者已经取消:那么 cancel() 方法会执行失败,返回 false.
3* 如果这个任务已经开始执行了,那么这个取消方法不会直接取消任务,而是会根据我们填的参数[true | false] 做判断: true 就发中断信号,flase 就不发中断信号(false 的最大意义在于阻止那些还未开始运行的任务,对于已经运行的任务则无效)
1import java.util.concurrent.Callable;
2import java.util.concurrent.ExecutionException;
3import java.util.concurrent.ExecutorService;
4import java.util.concurrent.Executors;
5import java.util.concurrent.Future;
6import java.util.concurrent.TimeUnit;
7import java.util.concurrent.TimeoutException;
8
9/**
10 * 描述: 演示get的超时方法,需要注意超时后处理,调用future.cancel()。演示cancel传入true和false的区别,代表是否中断正在执行的任务。
11 */
12public class Timeout {
13
14 private static final Ad DEFAULT_AD = new Ad("无网络时候的默认广告");
15 private static final ExecutorService exec = Executors.newFixedThreadPool(10);
16
17 static class Ad {
18
19 String name;
20
21 public Ad(String name) {
22 this.name = name;
23 }
24
25 @Override
26 public String toString() {
27 return "Ad{" +
28 "name='" + name + '\'' +
29 '}';
30 }
31 }
32
33
34 static class FetchAdTask implements Callable<Ad> {
35
36 @Override
37 public Ad call() throws Exception {
38 try {
39 Thread.sleep(3000);
40 } catch (InterruptedException e) {
41 System.out.println("sleep期间被中断了");
42 return new Ad("被中断时候的默认广告");//这里不生效,超时之后就已经没有意义了
43 }
44 return new Ad("旅游订票哪家强?找某程");
45 }
46 }
47
48
49 public void printAd() {
50 Future<Ad> f = exec.submit(new FetchAdTask());
51 Ad ad;
52 try {
53 ad = f.get(2000, TimeUnit.MILLISECONDS);
54 } catch (InterruptedException e) {
55 ad = new Ad("被中断时候的默认广告");
56 } catch (ExecutionException e) {
57 ad = new Ad("异常时候的默认广告");
58 } catch (TimeoutException e) { //超时之后应该主动取消线程执行
59 ad = new Ad("超时时候的默认广告"); //兜底数据
60 System.out.println("超时,未获取到广告");
61 boolean cancel = f.cancel(true);//true 对任务线程发起中断信号 false 则代表不会发起中断信号
62 System.out.println("cancel的结果:" + cancel);
63 }
64 exec.shutdown();
65 System.out.println(ad);
66 }
67
68 public static void main(String[] args) {
69 Timeout timeout = new Timeout();
70 timeout.printAd();
71 }
72}
1超时,未获取到广告
2cancel的结果:true
3sleep期间被中断了
4Ad{name='超时时候的默认广告'}
FutureTask 当作 Future 使用
- FutureTask 来获取 Future 和 任务的结果。
- FutureTask 是一种包装器,可以把 Callable 转换成 Futrue 和 Runnable,它同时实现二者的接口。FutureTask 既是 Runnable 又是 Futrue。
- 所以它既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。
FutrueTask 用法
把 Callable 实例当作参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,用线程池或另起线程去执行这个对象,最后通过 FutureTask 获取刚才执行的结果。
1import java.util.concurrent.Callable;
2import java.util.concurrent.ExecutionException;
3import java.util.concurrent.ExecutorService;
4import java.util.concurrent.Executors;
5import java.util.concurrent.FutureTask;
6
7/**
8 * 描述: 演示FutureTask的用法
9 */
10public class FutureTaskDemo {
11
12 public static void main(String[] args) {
13 //构建 Callable 实例
14 Task task = new Task();
15
16 //构建 FutureTask 实例
17 FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
18
19 //执行 FutureTask 实例
20 //new Thread(integerFutureTask).start(); //线程
21 ExecutorService service = Executors.newCachedThreadPool(); //线程池
22 service.submit(integerFutureTask);
23
24 try {
25 //获取 FutureTask 运行结果
26 System.out.println("task运行结果:"+integerFutureTask.get());
27 } catch (InterruptedException e) {
28 e.printStackTrace();
29 } catch (ExecutionException e) {
30 e.printStackTrace();
31 }
32 }
33}
34
35class Task implements Callable<Integer> {
36
37 @Override
38 public Integer call() throws Exception {
39 System.out.println("子线程正在计算");
40 Thread.sleep(3000);
41 int sum = 0;
42 for (int i = 0; i < 100; i++) {
43 sum += i;
44 }
45 return sum;
46 }
47}
1子线程正在计算
2task运行结果:4950
Future 注意点
- 当 for 循环批量获取 future 的结果时,容易发生一部分线程很慢的情况,get() 方法调用时应使用 timeout 限制。(多个任务,任务的内容不同,耗时也不同),会阻塞在最长的任务(等待)。
- Future 的生命周期不能后退。(一旦完成任务,它永久停留在“已完成”的状态,不能重头再来)