Stream.parallel()不是更新了spliterator的特性吗?
这个问题是基于这个问题的答案Stream.of 和 IntStream.range 有什么区别?
由于IntStream.range产生了一个已经排序的流,下面代码的输出只会生成如下输出0:
IntStream.range(0, 4)
.peek(e -> System.out.println(e))
.sorted()
.findFirst();
分离器也将具有SORTED特性。下面的代码返回true:
System.out.println(
IntStream.range(0, 4)
.spliterator()
.hasCharacteristics(Spliterator.SORTED)
);
现在,如果我parallel()在第一个代码中引入 a ,那么正如预期的那样,输出将包含从0to 的所有 4 个数字,3但顺序是随机的,因为由于parallel().
IntStream.range(0, 4)
.parallel()
.peek(e -> System.out.println(e))
.sorted()
.findFirst();
这将按任何顺序产生如下所示的内容:
2
0
1
3
2
0
1
3
因此,我预计该SORTED属性已因parallel(). 但是,下面的代码true也会返回。
为什么不parallel()改变SORTED属性?并且由于所有四个数字都被打印出来,Java 如何意识到即使该SORTED属性仍然存在,流也没有排序?
回答
这究竟是如何完成的在很大程度上是一个实现细节。您必须深入挖掘源代码才能真正了解原因。基本上,并行和顺序管道的处理方式不同。查看AbstractPipeline.evaluate,它检查isParallel(),然后根据管道是否并行执行不同的操作。
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
如果然后查看SortedOps.OfInt,您会看到它覆盖了两个方法:
@Override
public Sink<Integer> opWrapSink(int flags, Sink sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.SORTED.isKnown(flags))
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedIntSortingSink(sink);
else
return new IntSortingSink(sink);
}
@Override
public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator,
IntFunction<Integer[]> generator) {
if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
return helper.evaluate(spliterator, false, generator);
}
else {
Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
int[] content = n.asPrimitiveArray();
Arrays.parallelSort(content);
return Nodes.node(content);
}
}
opWrapSink如果它是顺序管道,则最终将被调用,并且opEvaluateParallel(正如其名称所暗示的)将在它是并行流时被调用。请注意,opWrapSink如果管道已经排序(只是将其返回不变),则如何不对给定的接收器执行任何操作,但opEvaluateParallel始终评估拆分器。
另请注意,并行性和排序性并不相互排斥。您可以拥有具有这些特征的任意组合的流。
“排序”是Spliterator. 从技术上讲,它不是 a 的特征Stream(就像“平行”一样)。当然,parallel可以创建一个具有全新特性的全新拆分器(从原始拆分器中获取元素)的流,但是为什么要这样做,因为您可以重用相同的拆分器?我想你在任何情况下都必须以不同的方式处理并行和顺序流。