博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Future、FutureTask实现原理浅析(源码解读)
阅读量:6533 次
发布时间:2019-06-24

本文共 4812 字,大约阅读时间需要 16 分钟。

前言

最近一直在看JUC下面的一些东西,发现很多东西都是以前用过,但是真是到原理层面自己还是很欠缺。

刚好趁这段时间不太忙,回来了便一点点学习总结。

前言

最近一直在看JUC下面的一些东西,发现很多东西都是以前用过,但是真是到原理层面自己还是很欠缺。

刚好趁这段时间不太忙,回来了便一点点学习总结。
由于自己水平有限,可能存在大量漏洞和思考不周到的地方,不吝赐教。

Future 模式

一种非常经典的设计模式,这种设计模式主要就利用空间换时间得到概念,也就是说异步执行(需要开启一个新的线程)。

在互联网高并发的应用服务中,我们随处可见这种理念和代码,主要就是使用了这种模式。
Future模式非常适合在处理耗时很长的业务逻辑时进行使用,可以有效的减小系统的响应时间,提高系统的吞吐量。

Future代码示例:

/** * @Description: * @Author: wangmeng * @Date: 2018/12/17-20:44 */public class UseFuture implements Callable
{ private String param; public UseFuture(String param) { this.param = param; } @Override public String call() throws Exception { //模拟执行业务逻辑的耗时 TimeUnit.SECONDS.sleep(3); String result = this.param + " 处理完成!"; return result; } public static void main(String[] args) throws Exception{ String queryStr = "query1"; String queryStr2 = "query2"; FutureTask
future1 = new FutureTask
(new UseFuture(queryStr)); FutureTask
future2 = new FutureTask
(new UseFuture(queryStr2)); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(future1);//异步操作 executorService.submit(future2);//异步操作 System.out.println("执行中..."); TimeUnit.SECONDS.sleep(2);//处理其他相关的任务。 String result1 = future1.get(); String result2 = future2.get(); System.out.println("数据处理完成。。" + result1); System.out.println("数据处理完成。。" + result2); }}

应用场景

Future模式有点类似于商品订单,比如在网购时,当看中某一件商品时,就可以提交订单,当订单处理完成后,在家等待商品送货上门即可。

或者说更形象的我们发送Ajax请求的时候,页面是异步的进行后台处理,用户无需一直等待请求的结果,可以继续浏览或操作其他内容。
流程图

Future实现原理

看到上面示例代码,我们是通过executorService.submit(future1) 来提交线程的,进一步看看里面具体的逻辑。

1、 AbstractExecutorService 中submit()源码:

submit

2、FutureTask中run()源码:

public void run() {        if (state != NEW ||            !UNSAFE.compareAndSwapObject(this, runnerOffset,                                         null, Thread.currentThread()))            return;        try {            Callable
c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

这个是核心代码,首先我们需要知道FutureTask中有一个volatile state全局变量,通过这个值来界定任务是否已经执行完毕。

state

将上面run方法一点点拆解如下:
run方法
先判断state状态,如果不是NEW说明执行完毕,直接return掉。
后面使用CAS操作,判断这个任务是否已经执行,这里FutureTask有个全局的volatile runner字段,这里通过cas将当前线程指定给runner。
这里可以防止callable被执行多次。
接着往下看:
run方法2

查看set方法具体实现:

set方法

继续往下跟,查看finishCompletion方法:

FutureTask中有一个WaiteNode单链表,当执行futureTask.get()方法时,多个线程会将等待的线程的next指向下一个想要get获取结果的线程。
finishCompletion主要就是使用Unsafe.unpark()进行唤醒操作。
finish

3,FutureTask.get() 源码

get() 方法会进行自旋操作等待,直到FutureTask中的state状态大于NORMAL(表示自行完成),然后才会通过FutureTask的outcome获取返回值。
get
接着往下跟awaitDone方法:

private int awaitDone(boolean timed, long nanos)        throws InterruptedException {        final long deadline = timed ? System.nanoTime() + nanos : 0L;        WaitNode q = null;        boolean queued = false;        for (;;) {            if (Thread.interrupted()) {                removeWaiter(q);                throw new InterruptedException();            }            int s = state;            if (s > COMPLETING) {                if (q != null)                    q.thread = null;                return s;            }            else if (s == COMPLETING) // cannot time out yet                Thread.yield();            else if (q == null)                q = new WaitNode();            else if (!queued)                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                     q.next = waiters, q);            else if (timed) {                nanos = deadline - System.nanoTime();                if (nanos <= 0L) {                    removeWaiter(q);                    return state;                }                LockSupport.parkNanos(this, nanos);            }            else                LockSupport.park(this);        }    }

还是老样子,一点点分析:

await
await2

不知道自己理解的是否有偏差,有问题欢迎大家随时指出,感谢备至。

到了这里 就不再讲解了,后面还有report、cancel等方法,大家可以自行参阅源码。

总结

结合上述分析可得 FutureTask 执行活动图如下:

流程图
同时也可以看出,在 FutureTask 中内部维护了一个单向链表 waiters , 在执行 get 的时候会向其中添加节点:
waiters

最后特别感谢掘金此位博主的分享,参考如下:

转载地址:http://xlqbo.baihongyu.com/

你可能感兴趣的文章
异构数据库
查看>>
透视校正插值
查看>>
Cobertura代码覆盖率测试
查看>>
【selenium学习笔记一】python + selenium定位页面元素的办法。
查看>>
Linux禁止ping
查看>>
【Matplotlib】 标注一些点
查看>>
[AX]乐观并发控制Optimistic Concurrency Control
查看>>
自定义类加载器
查看>>
MySQL数据库事务各隔离级别加锁情况--Repeatable Read && MVCC(转)
查看>>
C++构造函数例程
查看>>
把某一列值转换为逗号分隔字符串
查看>>
DLL,DML,DCL,TCL in Oracle
查看>>
iOS中--NSArray调用方法详解 (李洪强)
查看>>
java异步操作实例
查看>>
Centos6.8防火墙配置
查看>>
[精讲17] 组策略
查看>>
如何在Rancher上运行Elasticsearch
查看>>
shell 找出数组元素中的最大值
查看>>
Vmware虚拟机linux系统混合模式上网
查看>>
MySQL在导入的时候遇到的错误
查看>>