目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

Future、Callable

Runnable 缺陷

  • 不能返回一个返回值
  • run() 方法中不能抛出 checked Exception (方法签名),只能在 run() 方法中使用 try/catch 进行异常捕获。
1public abstract void run();

Callable 接口

  • 类似于 Runnable,被其他线程执行的任务
  • 实现 call 方法
  • 有返回值

Future

 Future 的核心思想是:一个方法的计算过程可能非常耗时,一直在原地等待方法返回,显然不明智。可以把该计算过程放到线程池去执行,并通过 Future 去控制方法的计算过程,在计算出结果后直接获取该结果。

Callable 和 Future 的关系

  • Future.get() 来获取 Callable 接口返回的执行结果,Future.isDone()来判断任务是否已经执行完了,以及取消这个任务,限时获取任务的结果等。
  • 在 call() 未执行完毕之前,调用 get() 的线程(假设是主线程)会被阻塞,直到 call() 方法返回了结果后,此时 future.get() 才会得到该结果,然后主线程 CIA 会切换到 runnable 状态。
  • Future 是一个存储器,他存储了 call() 这个任务的结果,而这个任务的执行时间是无法提前确定的,因为这完全取决于 call() 方法执行的情况

Future 的常用方法

  • get() 方法的行为取决于 Callable 任务的状态,只有以下 5 种情况:
11. 任务正常完成:get() 方法会立刻返回结果
22. 任务尚未完成(任务还没开始或进行中)get() 将阻塞并直到任务完成。
33. 任务执行过程中抛出 Exceptionget() 方法会抛出 `ExecutionException` (这里抛出的`ExecutionException` call() 执行时产生的异常,不论 call() 执行时抛出异常的类型是什么,最后get() 方法抛出的异常类型都是 `ExecutionException`
44. 任务被取消: get() 方法会抛出 `CancellationException`
55. 任务超时: get() 方法有一个重载方法, 是传入一个延迟时间的,如果时间到了还没有获得结果, get() 方法就会抛出 TimeoutException
  • get(long timeout, TimeUnit unit):有超时的获取
1* 超时的需要很常见
2* 用 get(long timeout,TimeUnit unit) 方法时,如果 call() 在规定时间内完成了任务,那么就会正常获取到返回值;而如果在指定时间内没有计算出结果,那么就会抛出 TimeoutException。
3* 超时不获取,任务需要取消(已经不需要了, 因为超时了,取消吧!)
  • isDone() 方法:判断线程是否执行完毕
  • isCancelled() 方法:判断是否被取消
  • cancel():取消任务
1* 取消任务的执行

ThreadPool 的 submit 方法返回 Future 对象

submit

 首先,我们要给线程池提交我们的任务,提交时线程池会立刻返回给我们一个空的 Future 容器。当线程的任务一旦执行完毕,也就是当我们可以获取结果的时候,线程池便会把该结果填入到空的 Future 容器中,此时便可以从 Future 中获得任务的执行结果。

Future 正常的使用方式

 1import java.util.Random;
 2import java.util.concurrent.Callable;
 3import java.util.concurrent.ExecutionException;
 4import java.util.concurrent.ExecutorService;
 5import java.util.concurrent.Executors;
 6import java.util.concurrent.Future;
 7
 8/**
 9 * 描述:     演示一个Future的使用方法
10 */
11public class OneFuture {
12
13    public static void main(String[] args) {
14        ExecutorService service = Executors.newFixedThreadPool(10);
15        Future<Integer> future = service.submit(new CallableTask());
16        try {
17            System.out.println(future.get());
18        } catch (InterruptedException e) {
19            e.printStackTrace();
20        } catch (ExecutionException e) {
21            e.printStackTrace();
22        }
23        service.shutdown();
24    }
25
26    static class CallableTask implements Callable<Integer> {
27
28        @Override
29        public Integer call() throws Exception {
30            Thread.sleep(3000);
31            return new Random().nextInt(); //2129913364
32        }
33    }
34
35}

Future 的 Lambda 形式

 1import java.util.Random;
 2import java.util.concurrent.Callable;
 3import java.util.concurrent.ExecutionException;
 4import java.util.concurrent.ExecutorService;
 5import java.util.concurrent.Executors;
 6import java.util.concurrent.Future;
 7
 8/**
 9 * 描述:     演示一个Future的使用方法,lambda形式
10 */
11public class OneFutureLambda {
12
13    public static void main(String[] args) {
14        ExecutorService service = Executors.newFixedThreadPool(10);
15
16        //Lambda 形式
17        Callable<Integer> callable = () -> {
18            Thread.sleep(3000);
19            return new Random().nextInt(); //2129913364
20        };
21
22        Future<Integer> future = service.submit(callable);
23        try {
24            System.out.println(future.get());
25        } catch (InterruptedException e) {
26            e.printStackTrace();
27        } catch (ExecutionException e) {
28            e.printStackTrace();
29        }
30        service.shutdown();
31    }
32
33}

多个任务:使用 Futrue 数组来获取结果

 1import java.util.ArrayList;
 2import java.util.Random;
 3import java.util.concurrent.Callable;
 4import java.util.concurrent.ExecutionException;
 5import java.util.concurrent.ExecutorService;
 6import java.util.concurrent.Executors;
 7import java.util.concurrent.Future;
 8
 9/**
10 * 描述:     演示批量提交任务时,用List来批量接收多个任务的结果
11 */
12public class MultiFutures {
13
14    public static void main(String[] args) throws InterruptedException {
15        ExecutorService service = Executors.newFixedThreadPool(20);
16
17        ArrayList<Future> futures = new ArrayList<>();
18        for (int i = 0; i < 20; i++) {
19            //提交任务
20            Future<Integer> future = service.submit(new CallableTask());
21            futures.add(future);
22        }
23
24
25        for (int i = 0; i < 20; i++) {
26            Future<Integer> future = futures.get(i);
27            try {
28                System.out.println(future.get());
29            } catch (InterruptedException e) {
30                e.printStackTrace();
31            } catch (ExecutionException e) {
32                e.printStackTrace();
33            }
34        }
35
36        service.shutdown();
37
38    }
39
40    static class CallableTask implements Callable<Integer> {
41
42        @Override
43        public Integer call() throws Exception {
44            Thread.sleep(3000);
45            return new Random().nextInt();
46        }
47    }
48}
 12010666636
 2-788311394
 32011169962
 484341953
 5-1682064941
 61859651109
 7765166231
 8739918142
 9547060157
10-1931733995
111493835634
12-2114927045
132035652866
14-323151289
151598374730
162041121102
17-385438633
18750789799
19728797692
20-1753371291

get() 方法过程中抛出异常:
 演示 get 方法过程中抛出异常,for 循环为了演示抛出 Exception 的时机:并不是说一产生异常就抛出,直到我们 get 执行时,才会抛出。

 1/**
 2 * 描述:     演示get方法过程中抛出异常,for循环为了演示抛出Exception的时机:并不是说一产生异常就抛出,直到我们get执行时,才会抛出。
 3 */
 4public class GetException {
 5
 6    public static void main(String[] args) {
 7        ExecutorService service = Executors.newFixedThreadPool(20);
 8        Future<Integer> future = service.submit(new CallableTask());
 9
10
11        try {
12            for (int i = 0; i < 5; i++) {
13                System.out.println(i);
14                Thread.sleep(500);
15            }
16            System.out.println(future.isDone()); //true 线程执行完毕( 这里只关系线程是否运行结束,并不关心是否执行成功,抛出异常等…  只要运行结束就是 true)
17            future.get();
18        } catch (InterruptedException e) {
19            e.printStackTrace();
20            System.out.println("InterruptedException异常");
21        } catch (ExecutionException e) { //在这里捕获到
22            e.printStackTrace();
23            System.out.println("ExecutionException异常");
24        }
25    }
26
27    static class CallableTask implements Callable<Integer> {
28
29        @Override
30        public Integer call() throws Exception {
31            throw new IllegalArgumentException("Callable抛出异常");
32        }
33    }
34}
 10
 21
 32
 43
 54
 6true
 7ExecutionException异常
 8java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable抛出异常
 9	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
10	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
11	at test.future.GetException.main(GetException.java:26)

cancel():演示 cancel 传入 true 和 false 的区别,代表是否中断正在执行的任务。
 当被中断的任务有能力处理 interrupt 信号的时候可以传入 true。
 当被中断的任务没有能力处理 interrupt 信号的时候(或者我也不清楚被中断的任务有没有这个能力),则可以传入 false。 或者想要停止那些还没有开始运行的任务,也可以传入 false。

1* 如果这个任务还没有开始执行,任务会被正常取消,未来也不会指定,方法返回 true。
2* 如果任务已完成,或者已经取消:那么 cancel() 方法会执行失败,返回 false.
3* 如果这个任务已经开始执行了,那么这个取消方法不会直接取消任务,而是会根据我们填的参数[true | false] 做判断: true 就发中断信号,flase 就不发中断信号(false 的最大意义在于阻止那些还未开始运行的任务,对于已经运行的任务则无效)
 1import java.util.concurrent.Callable;
 2import java.util.concurrent.ExecutionException;
 3import java.util.concurrent.ExecutorService;
 4import java.util.concurrent.Executors;
 5import java.util.concurrent.Future;
 6import java.util.concurrent.TimeUnit;
 7import java.util.concurrent.TimeoutException;
 8
 9/**
10 * 描述:     演示get的超时方法,需要注意超时后处理,调用future.cancel()。演示cancel传入true和false的区别,代表是否中断正在执行的任务。
11 */
12public class Timeout {
13
14    private static final Ad DEFAULT_AD = new Ad("无网络时候的默认广告");
15    private static final ExecutorService exec = Executors.newFixedThreadPool(10);
16
17    static class Ad {
18
19        String name;
20
21        public Ad(String name) {
22            this.name = name;
23        }
24
25        @Override
26        public String toString() {
27            return "Ad{" +
28                    "name='" + name + '\'' +
29                    '}';
30        }
31    }
32
33
34    static class FetchAdTask implements Callable<Ad> {
35
36        @Override
37        public Ad call() throws Exception {
38            try {
39                Thread.sleep(3000);
40            } catch (InterruptedException e) {
41                System.out.println("sleep期间被中断了");
42                return new Ad("被中断时候的默认广告");//这里不生效,超时之后就已经没有意义了
43            }
44            return new Ad("旅游订票哪家强?找某程");
45        }
46    }
47
48
49    public void printAd() {
50        Future<Ad> f = exec.submit(new FetchAdTask());
51        Ad ad;
52        try {
53            ad = f.get(2000, TimeUnit.MILLISECONDS);
54        } catch (InterruptedException e) {
55            ad = new Ad("被中断时候的默认广告");
56        } catch (ExecutionException e) {
57            ad = new Ad("异常时候的默认广告");
58        } catch (TimeoutException e) { //超时之后应该主动取消线程执行
59            ad = new Ad("超时时候的默认广告"); //兜底数据
60            System.out.println("超时,未获取到广告");
61            boolean cancel = f.cancel(true);//true 对任务线程发起中断信号  false 则代表不会发起中断信号
62            System.out.println("cancel的结果:" + cancel);
63        }
64        exec.shutdown();
65        System.out.println(ad);
66    }
67
68    public static void main(String[] args) {
69        Timeout timeout = new Timeout();
70        timeout.printAd();
71    }
72}
1超时,未获取到广告
2cancel的结果:true
3sleep期间被中断了
4Ad{name='超时时候的默认广告'}

FutureTask 当作 Future 使用

  • FutureTask 来获取 Future 和 任务的结果。
  • FutureTask 是一种包装器,可以把 Callable 转换成 Futrue 和 Runnable,它同时实现二者的接口。FutureTask 既是 Runnable 又是 Futrue。
  • 所以它既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

FutrueTask 用法
 把 Callable 实例当作参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,用线程池或另起线程去执行这个对象,最后通过 FutureTask 获取刚才执行的结果。

 1import java.util.concurrent.Callable;
 2import java.util.concurrent.ExecutionException;
 3import java.util.concurrent.ExecutorService;
 4import java.util.concurrent.Executors;
 5import java.util.concurrent.FutureTask;
 6
 7/**
 8 * 描述:     演示FutureTask的用法
 9 */
10public class FutureTaskDemo {
11
12    public static void main(String[] args) {
13        //构建 Callable 实例
14        Task task = new Task();
15
16        //构建 FutureTask 实例
17        FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
18
19        //执行 FutureTask 实例
20        //new Thread(integerFutureTask).start();    //线程
21        ExecutorService service = Executors.newCachedThreadPool();  //线程池
22        service.submit(integerFutureTask);
23
24        try {
25            //获取 FutureTask 运行结果
26            System.out.println("task运行结果:"+integerFutureTask.get());
27        } catch (InterruptedException e) {
28            e.printStackTrace();
29        } catch (ExecutionException e) {
30            e.printStackTrace();
31        }
32    }
33}
34
35class Task implements Callable<Integer> {
36
37    @Override
38    public Integer call() throws Exception {
39        System.out.println("子线程正在计算");
40        Thread.sleep(3000);
41        int sum = 0;
42        for (int i = 0; i < 100; i++) {
43            sum += i;
44        }
45        return sum;
46    }
47}
1子线程正在计算
2task运行结果:4950

Future 注意点

  • 当 for 循环批量获取 future 的结果时,容易发生一部分线程很慢的情况,get() 方法调用时应使用 timeout 限制。(多个任务,任务的内容不同,耗时也不同),会阻塞在最长的任务(等待)。
  • Future 的生命周期不能后退。(一旦完成任务,它永久停留在“已完成”的状态,不能重头再来)

作者:Soulboy