FutureTask 在线程池中应用和源码解析

2018 年 11 月 22 日 ImportNew

(点击上方公众号,可快速关注)


来源:hcy0411 ,

www.jianshu.com/p/1fac6476e85f



FutureTask 是一个支持取消的异步处理器,一般在线程池中用于异步接受callable返回值。


主要实现分三部分:


  1. 封装 Callable,然后放到线程池中去异步执行->run。

  2. 获取结果-> get。

  3. 取消任务-> cancel。


接下来主要学习下该模型如何实现。


举例说明FutureTask在线程池中的应用


// 第一步,定义线程池,

ExecutorService executor = new ThreadPoolExecutor(

        minPoolSize,

        maxPollSize,

        keepAliveTime,

        TimeUnit.SECONDS,

        new SynchronousQueue<>());

 

// 第二步,放到线程池中执行,返回FutureTask

FutureTask  task = executor.submit(callable);

 

// 第三步,获取返回值

T data = task.get();


学习FutureTask实现


类属性


//以下是FutureTask的各种状态

private volatile int state; 

private static final int NEW          = 0; 

private static final int COMPLETING   = 1;

private static final int NORMAL       = 2;

private static final int EXCEPTIONAL  = 3;

private static final int CANCELLED    = 4;

private static final int INTERRUPTING = 5;

private static final int INTERRUPTED  = 6;

 

private Callable<V> callable; //执行的任务

private Object outcome; //存储结果或者异常

private volatile Thread runner;//执行callable的线程

private volatile WaitNode waiters; //调用get方法等待获取结果的线程栈

 

其中各种状态存在 最终状态 status>COMPLETING

1)NEW -> COMPLETING -> NORMAL(有正常结果)

2) NEW -> COMPLETING -> EXCEPTIONAL(结果为异常) 

3) NEW -> CANCELLED(无结果) 

4) NEW -> INTERRUPTING -> INTERRUPTED(无结果)


类方法


从上面举例说明开始分析。


run()方法


FutureTask 继承 Runnable,ExecutorService submit 把提交的任务封装成 FutureTask 然后放到线程池 ThreadPoolExecutor 的 execute 执行。


public void run() {

    //如果不是初始状态或者cas设置运行线程是当前线程不成功,直接返回

    if (state != NEW ||

        !UNSAFE.compareAndSwapObject(this, runnerOffset,

                                     null, Thread.currentThread()))

        return;

    try {

        Callable<V> c = callable;

        if (c != null && state == NEW) {

            V result;

            boolean ran;

            try {

              // 执行callable任务 这里对异常进行了catch

                result = c.call();

                ran = true;

            } catch (Throwable ex) {

                result = null;

                ran = false;

                setException(ex); // 封装异常到outcome

            }

            if (ran)

                set(result);

        }

    } finally {

        runner = null;

        int s = state;

        // 这里如果是中断中,设置成最终状态

        if (s >= INTERRUPTING)

            handlePossibleCancellationInterrupt(s);

    }

}


以上是 run 方法源码实现很简单,解析如下:


  1. 如果不是始状态或者 cas 设置运行线程是当前线程不成功,直接返回,防止多个线程重复执行。

  2. 执行 Callable 的 call(),即提交执行任务(这里做了catch,会捕获执行任务的异常封装到 outcome 中)。

  3. 如果成功执行 set 方法,封装结果。


set 方法


protected void set(V v) {

    //cas方式设置成completing状态,防止多个线程同时处理

    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

        outcome = v; // 封装结果

        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终设置成normal状态

 

        finishCompletion();

    }

}


解析如下:


  1. cas方式设置成completing状态,防止多个线程同时处理

  2. 封装结果到outcome,然后设置到最终状态normal

  3. 执行finishCompletion方法。


finishCompletion方法


// state > COMPLETING; 不管异常,中断,还是执行完成,都需要执行该方法来唤醒调用get方法阻塞的线程

private void finishCompletion() {

    // assert state > COMPLETING;

    for (WaitNode q; (q = waiters) != null;) {

        // cas 设置waiters为null,防止多个线程执行。

        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

            // 循环唤醒所有等待结果的线程

            for (;;) {

                Thread t = q.thread;

                if (t != null) {

                    q.thread = null;

                    //唤醒线程

                    LockSupport.unpark(t);

                }

                WaitNode next = q.next;

                if (next == null)

                    break;

                q.next = null; // unlink to help gc

                q = next;

            }

            break;

        }

    }

   //该方法为空,可以被重写

    done();

    callable = null;        // to reduce footprint

}


解析如下:


遍历waiters中的等待节点,并通过 LockSupport 唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能 cancel,异常等)。


以上就是执行的过程,接下来分析获取结果的过程->get。


get 方法


public V get() throws InterruptedException, ExecutionException {

    int s = state;

    if (s <= COMPLETING)

        s = awaitDone(false, 0L);

    return report(s);

}


public V get(long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException {

        if (unit == null)

            throw new NullPointerException();

        int s = state;

        if (s <= COMPLETING &&

            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

            throw new TimeoutException();

        return report(s);

    }


解析如下:


以上两个方法,原理一样,其中一个设置超时时间,支持最多阻塞多长时间。

状态如果小于 COMPLETING,说明还没到最终状态,(不管是否是成功、异常、取消)。

调用 awaitDone 方法阻塞线程,最终调用 report 方法返回结果。


awaitDone 方法


private int awaitDone(boolean timed, long nanos)

        throws InterruptedException {

        final long deadline = timed ? System.nanoTime() + nanos : 0L;

        WaitNode q = null;

        boolean queued = false;

        for (;;) {

            //线程可中断,如果当前阻塞获取结果线程执行interrupt()方法,则从队列中移除该节点,并抛出中断异常

            if (Thread.interrupted()) {

                removeWaiter(q);

                throw new InterruptedException();

            }

            int s = state;

            // 如果已经是最终状态,退出返回

            if (s > COMPLETING) {

                if (q != null)

                    q.thread = null;

                return s;

            }

            //这里做了个优化,competiting到最终状态时间很短,通过yield比挂起响应更快。

            else if (s == COMPLETING) // cannot time out yet

                Thread.yield();

            // 初始化该阻塞节点

            else if (q == null)

                q = new WaitNode();

            // cas方式写到阻塞waiters栈中

            else if (!queued)

                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

                                                     q.next = waiters, q);

            // 这里做阻塞时间处理。

            else if (timed) {

                nanos = deadline - System.nanoTime();

                if (nanos <= 0L) {

                    removeWaiter(q);

                    return state;

                }

                // 阻塞线程,有超时时间

                LockSupport.parkNanos(this, nanos);

            }

            else

                // 阻塞线程

                LockSupport.park(this);

        }

    }


解析如下:


整体流程已写到注解中,整体实现是放在一个死循环中,唯一出口,是达到最终状态。

然后是构建节点元素,并将该节点入栈,同时阻塞当前线程等待运行主任务的线程唤醒该节点。


report 方法


private V report(int s) throws ExecutionException {

    Object x = outcome;

    if (s == NORMAL)

        return (V)x;

    if (s >= CANCELLED)

        throw new CancellationException();

    throw new ExecutionException((Throwable)x);

}


然后是report方法,如果是正常结束,返回结果,如果不是正常结束(取消,中断)抛出异常。


最后分析下取消流程。


cancel 方法


public boolean cancel(boolean mayInterruptIfRunning) {

    if (!(state == NEW &&

          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

        return false;

    try {    // in case call to interrupt throws exception

        if (mayInterruptIfRunning) {

            try {

                Thread t = runner;

                if (t != null)

                    t.interrupt();

            } finally { // final state

                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);

            }

        }

    } finally {

        finishCompletion();

    }

    return true;

}


解析如下:


mayInterruptIfRunning参数是是否允许运行中被中断取消。


  1. 根据入参是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则直接返回false。

  2. 如果允许运行中被中断取消,调用runner.interupt()进行中断取消,设置状态为INTERRUPTED。

  3. 唤醒所有在get()方法等待的线程。


此处有两种状态转换:


  1. 如果mayInterruptIfRunning为true:status状态转换为 new -> INTERRUPTING->INTERRUPTED。主动去中断执行线程,然后唤醒所有等待结果的线程。

  2. 如果mayInterruptIfRunning为false:status状态转换为 new -> CANCELLED。


不会去中断执行线程,直接唤醒所有等待结果的线程,从 awaitDone 方法中可以看到,唤醒等待线程后,直接从跳转回 get 方法,然后把结果返回给获取结果的线程,当然此时的结果是 null。


总结


以上就是 FutureTask 的源码简单解析,实现比较简单,FutureTask 就是一个实现 Future 模式,支持取消的异步处理器。


看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

登录查看更多
0

相关内容

多智能体深度强化学习的若干关键科学问题
专知会员服务
186+阅读 · 2020年5月24日
可解释强化学习,Explainable Reinforcement Learning: A Survey
专知会员服务
129+阅读 · 2020年5月14日
100+篇《自监督学习(Self-Supervised Learning)》论文最新合集
专知会员服务
164+阅读 · 2020年3月18日
近期必读的7篇 CVPR 2019【视觉问答】相关论文和代码
专知会员服务
35+阅读 · 2020年1月10日
必读的10篇 CVPR 2019【生成对抗网络】相关论文和代码
专知会员服务
31+阅读 · 2020年1月10日
Cayley图数据库的可视化(Visualize)
Python开发者
5+阅读 · 2019年9月9日
漏洞预警丨Xstream远程代码执行漏洞
FreeBuf
4+阅读 · 2019年7月25日
用 GitLab 的 Merge Request 做代码评审
DevOps时代
4+阅读 · 2019年5月5日
1500+ FPS!目前最快的CNN人脸检测算法开源
极市平台
25+阅读 · 2019年3月15日
注意力机制(Attention)最新综述论文及相关源码
人工智能学家
30+阅读 · 2018年11月17日
半监督多任务学习:Semisupervised Multitask Learning
我爱读PAMI
18+阅读 · 2018年4月29日
浅谈浏览器 http 的缓存机制
前端大全
6+阅读 · 2018年1月21日
Capsule Networks解析
机器学习研究会
11+阅读 · 2017年11月12日
Self-Driving Cars: A Survey
Arxiv
41+阅读 · 2019年1月14日
Arxiv
6+阅读 · 2018年1月29日
VIP会员
相关资讯
Cayley图数据库的可视化(Visualize)
Python开发者
5+阅读 · 2019年9月9日
漏洞预警丨Xstream远程代码执行漏洞
FreeBuf
4+阅读 · 2019年7月25日
用 GitLab 的 Merge Request 做代码评审
DevOps时代
4+阅读 · 2019年5月5日
1500+ FPS!目前最快的CNN人脸检测算法开源
极市平台
25+阅读 · 2019年3月15日
注意力机制(Attention)最新综述论文及相关源码
人工智能学家
30+阅读 · 2018年11月17日
半监督多任务学习:Semisupervised Multitask Learning
我爱读PAMI
18+阅读 · 2018年4月29日
浅谈浏览器 http 的缓存机制
前端大全
6+阅读 · 2018年1月21日
Capsule Networks解析
机器学习研究会
11+阅读 · 2017年11月12日
Top
微信扫码咨询专知VIP会员