按 CompletableFuture 完成顺序实现 Streaming Future

2019 年 5 月 28 日 ImportNew

(给ImportNew加星标,提高Java技能)

编译:ImportNew/唐尤华

Java 8 给引入了 `CompletableFuture` 和 Stream API 这样的工具。让我们尝试把它们结合起来,创建一个 Stream 在 future 完成时返回一组 `CompletableFutures` 集合。


在 [parallel-collectors][1] V1.0.0 开发中也使用了这种方法。


[1]:https://github.com/pivovarit/parallel-collectors


把 CompletableFuture 转成 Steam


基本上,我们要做的就是设计一种方案,把一组 future 集合转换成由任务返回值组成的 Steam:


```java
Collection<CompletableFuture<T>> -> Stream<T>
```


在 Java 的世界里,这可以通过使用 `static` 方法实现:


```java
public static <T> Stream<T> inCompletionOrder(Collection<CompletableFuture<T>> futures) {
// ...
}
```


要创建自定义 `Stream`,需要自己实现一个 `java.util.Spliterator`:


```java
final class CompletionOrderSpliterator<T>
implements Spliterator<T> { ... }
```


下面是 `static` 方法的具体实现:


```java
public static <T> Stream<T> completionOrder(Collection<CompletableFuture<T>> futures) {
return StreamSupport.stream(
new CompletionOrderSpliterator<>(futures), false);
}
```


这部分相对简单,现在让我们实现 `CompletionOrderSpliterator`。


实现 CompletionOrderSpliterator


要实现自定义 `Spliterator`,需要完成下列方法:


```java
final class CompletionOrderSpliterator<T> implements Spliterator<T> {
CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
// TODO
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
// TODO
}
@Override
public Spliterator<T> trySplit() {
// TODO
}
@Override
public long estimateSize()
{
// TODO
}
@Override
public int characteristics()
{
// TODO
}
}
```


当然,构造函数也要实现。


"最直接的解决方法:拷贝传入的集合,等待 future 完成,把完成的 future 从集合里移除,把结果传给 `Spliterator`。"


使用 `CompletableFuture#anyOf` 可等待 future 完成,并且默认实现了正确的异常处理。


然而,还有一个问题略显复杂。


如果仔细查看 `CompletableFuture#anyOf` 方法,会发现它不是很实用,因为要求传入多个 `CompletableFutures<?>` 然后返回一个 `CompletableFuture< Object>` 对象,但这不是主要问题,只是稍有不便。


真正的问题在于,方法返回的 `CompletableFuture<Object>` 对象并不是第一个完成的 future,而是当有任何一个 future 完成时新建的 `CompletableFuture` 实例。


这种方案把"等待 future 完成,然后从列表移除"变复杂了。"我们不能依赖引用想等性,所以要么在 `CompletableFuture#anyof` 触发后执行线性扫描,要么试着想出更好的办法。"


> 译注:"Reference Equality 引用相等性"是对象相等性的一部分,在两个被比较的引用都指向同一个对象的情况下,通过使用 `==` 而不是进一步进行对象比较。


一种简单的解决方案:


```java
private T takeNextCompleted()
{
anyOf(futureQueue.toArray(new CompletableFuture[0])).join();
CompletableFuture<T> next = null;
for (CompletableFuture<T> future : futureQueue) {
if (future.isDone()) {
next = future;
break;
}
}
futureQueue.remove(next);
return next.join();
}
```


上面的代码中,执行线性扫描并记录了 `index`,确保移出操作时间复杂度为常量。尽管已经知道数组大小,为什么还要向 `CompletableFuture[]` 传 0?


[2]:https://shipilev.net/blog/2016/arrays-wisdom-ancients/


从实用角度看,这个方案应该是足够好了,"通常没有人会处理1万~2万大小的 future 集合",而且硬件支持的线程数量有上限。受堆栈大小等多种因素影响,实际支持的线程数量会有所差别。不过,一旦“[Loom 项目][3]”投入使用,这种情况可能会有改善。


> 译注:Loom 项目提供一个轻量级用户态的纤程,简化并发编程并且更为高效。


[3]:https://openjdk.java.net/projects/loom/


尽管如此,2万次迭代最乐观的情况下会访问2万个节点(即总是第一个完成的 future),至多访问[2亿个节点][4]节点。


[4]:https://en.wikipedia.org/wiki/Arithmetic_progression#Sum


如果无法依赖 `CompletableFuture` 引用相等性或者 hashcode 还可以做怎样的改进?


可以为 future 分配 id,将它们与对象 future 一起存储到 map 中,这样 future 可以通过关联的 index 标记自己。


所以,让我们把 future 存到 map 中:


```java
private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;
```


现在,可以从一个单调递增序列中手动指定 id,并让 future 返回时带上 id:


```java
private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(List<CompletableFuture<T>> futures) {
Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map
= new HashMap<>(futures.size(), 1); // 因为知道集合大小和预期的冲突计数 (0), 可以提前指定 HashMap 大小
int seq = 0;
for (CompletableFuture<T> future : futures) {
int index = seq++;
map.put(
index,
future.thenApply(
value -> new AbstractMap.SimpleEntry<>(index, value)));
}
return map;
}
```


现在,可以高效地找到并处理下一个完成的 future:等待 future,读取序列号,根据序列号从剩余序列中移除:


```java
private T nextCompleted()
{
return anyOf(indexedFutures.values()
.toArray(new CompletableFuture[0]))
.thenApply(result -> ((Map.Entry<Integer, T>) result))
.thenApply(result -> {
indexedFutures.remove(result.getKey());
return result.getValue();
}).join();
}
```


`tryAdvance()` 的实现很简单:


```java
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (!indexedFutures.isEmpty()) {
action.accept(nextCompleted());
return true;
} else {
return false;
}
}
```


最困难的部分已经解决,现在需要实现剩下的三个方法:


```java
@Override
public Spliterator<T> trySplit() {
return null; // 不支持 split
}
@Override
public long estimateSize()
{
return indexedFutures.size(); // 提前知道集合的大小
}
@Override
public int characteristics()
{
return
SIZED // 知道前面的大小
& IMMUTABLE // 输入的集合可安全地修改
& NONNULL; // 输入的集合不支持 null
}
```


到这里代码已经完成。


示例演示


可以加入随机处理延迟快速验证代码是否正确:


```java
public static void main(String[] args)
{
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<CompletableFuture<Integer>> futures = Stream
.iterate(0, i -> i + 1)
.limit(100)
.map(i -> CompletableFuture.supplyAsync(
withRandomDelay(i), executorService))
.collect(Collectors.toList());
completionOrder(futures)
.forEach(System.out::println);
}

private static Supplier<Integer> withRandomDelay(Integer i) {
return () -> {
try {
Thread.sleep(ThreadLocalRandom.current()
.nextInt(10000));
} catch (InterruptedException e) {
// 无耻地留白了, 请不要在生产环境中这么做
}
return i;
};
}
```


可以看到,结果没有按照原来的顺序返回:


Streaming Future 的原始顺序


```shell
6
5
2
4
1
11
8
12
3
```



按原始顺序 Streaming Future


假如要求只保持原来的顺序该怎么处理?


幸运的是,可以像下面这样实现,无需添加特别的实现:


```java
public static <T> Stream<T> originalOrder(
Collection<CompletableFuture<T>> futures) {
return futures.stream().map(CompletableFuture::join);
}
```


完整示例


```java
package com.pivovarit.collectors;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import static java.util.concurrent.CompletableFuture.anyOf;
/**
* @author Grzegorz Piwowarek
*/

final class CompletionOrderSpliterator<T> implements Spliterator<T> {
private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;
CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
indexedFutures = toIndexedFutures(futures);
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (!indexedFutures.isEmpty()) {
action.accept(nextCompleted());
return true;
} else {
return false;
}
}
private T nextCompleted() {
return anyOf(indexedFutures.values().toArray(new CompletableFuture[0]))
.thenApply(result -> ((Map.Entry<Integer, T>) result))
.thenApply(result -> {
indexedFutures.remove(result.getKey());
return result.getValue();
}).join();
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize()
{
return indexedFutures.size();
}
@Override
public int characteristics()
{
return SIZED & IMMUTABLE & NONNULL;
}
private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(Collection<CompletableFuture<T>> futures) {
Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map = new HashMap<>(futures.size(), 1);
int counter = 0;
for (CompletableFuture<T> f : futures) {
int index = counter++;
map.put(index, f.thenApply(value -> new AbstractMap.SimpleEntry<>(index, value)));
}
return map;
}
}
```


本文完整的源代码也可以[在 GitHub 上找到][5]。


[5]:https://github.com/pivovarit/articles/blob/master/java-completion-order-spliterator/src/main/java/com/pivovarit/stream/CompletionOrderSpliterator.java


如果有更好的改进建议,欢迎留言!


推荐阅读

(点击标题可跳转阅读)

惰性日志

Java 8 并行流介绍

Jackson 属性自定义命名策略


看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

好文章,我在看❤️

登录查看更多
6

相关内容

Python分布式计算,171页pdf,Distributed Computing with Python
专知会员服务
107+阅读 · 2020年5月3日
【实用书】流数据处理,Streaming Data,219页pdf
专知会员服务
76+阅读 · 2020年4月24日
【2020新书】Kafka实战:Kafka in Action,209页pdf
专知会员服务
67+阅读 · 2020年3月9日
【ICLR2020】五篇Open代码的GNN论文
专知会员服务
47+阅读 · 2019年10月2日
用 GitLab 的 Merge Request 做代码评审
DevOps时代
4+阅读 · 2019年5月5日
抖音爬虫
专知
3+阅读 · 2019年2月11日
如何编写完美的 Python 命令行程序?
CSDN
5+阅读 · 2019年1月19日
【大数据】StreamSets:一个大数据采集工具
产业智能官
40+阅读 · 2018年12月5日
LibRec 精选:推荐的可解释性[综述]
LibRec智能推荐
10+阅读 · 2018年5月4日
设计和实现一款轻量级的爬虫框架
架构文摘
13+阅读 · 2018年1月17日
多轮对话之对话管理:Dialog Management
PaperWeekly
18+阅读 · 2018年1月15日
tensorflow LSTM + CTC实现端到端OCR
数据挖掘入门与实战
8+阅读 · 2017年11月15日
Directions for Explainable Knowledge-Enabled Systems
Arxiv
26+阅读 · 2020年3月17日
Knowledge Based Machine Reading Comprehension
Arxiv
4+阅读 · 2018年9月12日
Arxiv
12+阅读 · 2018年9月5日
VIP会员
相关资讯
用 GitLab 的 Merge Request 做代码评审
DevOps时代
4+阅读 · 2019年5月5日
抖音爬虫
专知
3+阅读 · 2019年2月11日
如何编写完美的 Python 命令行程序?
CSDN
5+阅读 · 2019年1月19日
【大数据】StreamSets:一个大数据采集工具
产业智能官
40+阅读 · 2018年12月5日
LibRec 精选:推荐的可解释性[综述]
LibRec智能推荐
10+阅读 · 2018年5月4日
设计和实现一款轻量级的爬虫框架
架构文摘
13+阅读 · 2018年1月17日
多轮对话之对话管理:Dialog Management
PaperWeekly
18+阅读 · 2018年1月15日
tensorflow LSTM + CTC实现端到端OCR
数据挖掘入门与实战
8+阅读 · 2017年11月15日
Top
微信扫码咨询专知VIP会员