目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

Future、Callable

Runnable 缺陷

  • 不能返回一个返回值
  • run() 方法中不能抛出 checked Exception (方法签名),只能在 run() 方法中使用 try/catch 进行异常捕获。
public abstract void run();

Callable 接口

  • 类似于 Runnable,被其他线程执行的任务
  • 实现 call 方法
  • 有返回值

Future

 Future 的核心思想是:一个方法的计算过程可能非常耗时,一直在原地等待方法返回,显然不明智。可以把该计算过程放到线程池去执行,并通过 Future 去控制方法的计算过程,在计算出结果后直接获取该结果。

Callable 和 Future 的关系

  • Future.get() 来获取 Callable 接口返回的执行结果,Future.isDone()来判断任务是否已经执行完了,以及取消这个任务,限时获取任务的结果等。
  • 在 call() 未执行完毕之前,调用 get() 的线程(假设是主线程)会被阻塞,直到 call() 方法返回了结果后,此时 future.get() 才会得到该结果,然后主线程 CIA 会切换到 runnable 状态。
  • Future 是一个存储器,他存储了 call() 这个任务的结果,而这个任务的执行时间是无法提前确定的,因为这完全取决于 call() 方法执行的情况

Future 的常用方法

  • get() 方法的行为取决于 Callable 任务的状态,只有以下 5 种情况:
1. 任务正常完成:get() 方法会立刻返回结果
2. 任务尚未完成(任务还没开始或进行中):get() 将阻塞并直到任务完成。
3. 任务执行过程中抛出 Exception:get() 方法会抛出 `ExecutionException` (这里抛出的`ExecutionException`是 call() 执行时产生的异常,不论 call() 执行时抛出异常的类型是什么,最后get() 方法抛出的异常类型都是 `ExecutionException`)
4. 任务被取消: get() 方法会抛出 `CancellationException`
5. 任务超时: get() 方法有一个重载方法, 是传入一个延迟时间的,如果时间到了还没有获得结果, get() 方法就会抛出 TimeoutException。
  • get(long timeout, TimeUnit unit):有超时的获取
* 超时的需要很常见
* 用 get(long timeout,TimeUnit unit) 方法时,如果 call() 在规定时间内完成了任务,那么就会正常获取到返回值;而如果在指定时间内没有计算出结果,那么就会抛出 TimeoutException。
* 超时不获取,任务需要取消(已经不需要了, 因为超时了,取消吧!)
  • isDone() 方法:判断线程是否执行完毕
  • isCancelled() 方法:判断是否被取消
  • cancel():取消任务
* 取消任务的执行

ThreadPool 的 submit 方法返回 Future 对象

submit

 首先,我们要给线程池提交我们的任务,提交时线程池会立刻返回给我们一个空的 Future 容器。当线程的任务一旦执行完毕,也就是当我们可以获取结果的时候,线程池便会把该结果填入到空的 Future 容器中,此时便可以从 Future 中获得任务的执行结果。

Future 正常的使用方式

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 描述:     演示一个Future的使用方法
 */
public class OneFuture {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(10);
        Future<Integer> future = service.submit(new CallableTask());
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        service.shutdown();
    }

    static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            Thread.sleep(3000);
            return new Random().nextInt(); //2129913364
        }
    }

}

Future 的 Lambda 形式

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 描述:     演示一个Future的使用方法,lambda形式
 */
public class OneFutureLambda {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(10);

        //Lambda 形式
        Callable<Integer> callable = () -> {
            Thread.sleep(3000);
            return new Random().nextInt(); //2129913364
        };

        Future<Integer> future = service.submit(callable);
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        service.shutdown();
    }

}

多个任务:使用 Futrue 数组来获取结果

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 描述:     演示批量提交任务时,用List来批量接收多个任务的结果
 */
public class MultiFutures {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(20);

        ArrayList<Future> futures = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            //提交任务
            Future<Integer> future = service.submit(new CallableTask());
            futures.add(future);
        }


        for (int i = 0; i < 20; i++) {
            Future<Integer> future = futures.get(i);
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        service.shutdown();

    }

    static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            Thread.sleep(3000);
            return new Random().nextInt();
        }
    }
}
2010666636
-788311394
2011169962
84341953
-1682064941
1859651109
765166231
739918142
547060157
-1931733995
1493835634
-2114927045
2035652866
-323151289
1598374730
2041121102
-385438633
750789799
728797692
-1753371291

get() 方法过程中抛出异常:
 演示 get 方法过程中抛出异常,for 循环为了演示抛出 Exception 的时机:并不是说一产生异常就抛出,直到我们 get 执行时,才会抛出。

/**
 * 描述:     演示get方法过程中抛出异常,for循环为了演示抛出Exception的时机:并不是说一产生异常就抛出,直到我们get执行时,才会抛出。
 */
public class GetException {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(20);
        Future<Integer> future = service.submit(new CallableTask());


        try {
            for (int i = 0; i < 5; i++) {
                System.out.println(i);
                Thread.sleep(500);
            }
            System.out.println(future.isDone()); //true 线程执行完毕( 这里只关系线程是否运行结束,并不关心是否执行成功,抛出异常等…  只要运行结束就是 true)
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("InterruptedException异常");
        } catch (ExecutionException e) { //在这里捕获到
            e.printStackTrace();
            System.out.println("ExecutionException异常");
        }
    }

    static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            throw new IllegalArgumentException("Callable抛出异常");
        }
    }
}
0
1
2
3
4
true
ExecutionException异常
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable抛出异常
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at test.future.GetException.main(GetException.java:26)

cancel():演示 cancel 传入 true 和 false 的区别,代表是否中断正在执行的任务。
 当被中断的任务有能力处理 interrupt 信号的时候可以传入 true。
 当被中断的任务没有能力处理 interrupt 信号的时候(或者我也不清楚被中断的任务有没有这个能力),则可以传入 false。 或者想要停止那些还没有开始运行的任务,也可以传入 false。

* 如果这个任务还没有开始执行,任务会被正常取消,未来也不会指定,方法返回 true。
* 如果任务已完成,或者已经取消:那么 cancel() 方法会执行失败,返回 false.
* 如果这个任务已经开始执行了,那么这个取消方法不会直接取消任务,而是会根据我们填的参数[true | false] 做判断: true 就发中断信号,flase 就不发中断信号(false 的最大意义在于阻止那些还未开始运行的任务,对于已经运行的任务则无效)
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 描述:     演示get的超时方法,需要注意超时后处理,调用future.cancel()。演示cancel传入true和false的区别,代表是否中断正在执行的任务。
 */
public class Timeout {

    private static final Ad DEFAULT_AD = new Ad("无网络时候的默认广告");
    private static final ExecutorService exec = Executors.newFixedThreadPool(10);

    static class Ad {

        String name;

        public Ad(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "Ad{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }


    static class FetchAdTask implements Callable<Ad> {

        @Override
        public Ad call() throws Exception {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                System.out.println("sleep期间被中断了");
                return new Ad("被中断时候的默认广告");//这里不生效,超时之后就已经没有意义了
            }
            return new Ad("旅游订票哪家强?找某程");
        }
    }


    public void printAd() {
        Future<Ad> f = exec.submit(new FetchAdTask());
        Ad ad;
        try {
            ad = f.get(2000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            ad = new Ad("被中断时候的默认广告");
        } catch (ExecutionException e) {
            ad = new Ad("异常时候的默认广告");
        } catch (TimeoutException e) { //超时之后应该主动取消线程执行
            ad = new Ad("超时时候的默认广告"); //兜底数据
            System.out.println("超时,未获取到广告");
            boolean cancel = f.cancel(true);//true 对任务线程发起中断信号  false 则代表不会发起中断信号
            System.out.println("cancel的结果:" + cancel);
        }
        exec.shutdown();
        System.out.println(ad);
    }

    public static void main(String[] args) {
        Timeout timeout = new Timeout();
        timeout.printAd();
    }
}
超时,未获取到广告
cancel的结果:true
sleep期间被中断了
Ad{name='超时时候的默认广告'}

FutureTask 当作 Future 使用

  • FutureTask 来获取 Future 和 任务的结果。
  • FutureTask 是一种包装器,可以把 Callable 转换成 Futrue 和 Runnable,它同时实现二者的接口。FutureTask 既是 Runnable 又是 Futrue。
  • 所以它既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

FutrueTask 用法
 把 Callable 实例当作参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,用线程池或另起线程去执行这个对象,最后通过 FutureTask 获取刚才执行的结果。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * 描述:     演示FutureTask的用法
 */
public class FutureTaskDemo {

    public static void main(String[] args) {
        //构建 Callable 实例
        Task task = new Task();

        //构建 FutureTask 实例
        FutureTask<Integer> integerFutureTask = new FutureTask<>(task);

        //执行 FutureTask 实例
        //new Thread(integerFutureTask).start();    //线程
        ExecutorService service = Executors.newCachedThreadPool();  //线程池
        service.submit(integerFutureTask);

        try {
            //获取 FutureTask 运行结果
            System.out.println("task运行结果:"+integerFutureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

class Task implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("子线程正在计算");
        Thread.sleep(3000);
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum += i;
        }
        return sum;
    }
}
子线程正在计算
task运行结果:4950

Future 注意点

  • 当 for 循环批量获取 future 的结果时,容易发生一部分线程很慢的情况,get() 方法调用时应使用 timeout 限制。(多个任务,任务的内容不同,耗时也不同),会阻塞在最长的任务(等待)。
  • Future 的生命周期不能后退。(一旦完成任务,它永久停留在“已完成”的状态,不能重头再来)

作者:Soulboy