在Java中使用reduce和prallelstream时出现奇怪的结果
以下代码的结果是错误且随机的。
List<String> list = Stream.of("a", "b").parallel().map(e -> List.of(e + "1", e + "2"))
.reduce(new ArrayList<>(),
(r, l) -> {
log.info("accumulator r:{}, l:{}", r, l);
r.addAll(l);
return r;
},
(r, l) -> {
log.info("combiner r:{}, l:{}", r, l);
r.addAll(l);
return r;
});
System.out.println(list);
第一个结果?
20:52:35.709 [main] - accumulator r:[], l:[b1, b2]
20:52:35.709 [ForkJoinPool.commonPool-worker-3] - accumulator r:[], l:[a1, a2]
20:52:35.711 [ForkJoinPool.commonPool-worker-3] - combiner r:[b1, b2], l:[b1, b2]
[b1, b2, b1, b2]
第二个结果?
20:53:09.781 [main] - accumulator r:[], l:[b1, b2]
20:53:09.782 [ForkJoinPool.commonPool-worker-3] - accumulator r:[], l:[a1, a2]
20:53:09.783 [ForkJoinPool.commonPool-worker-3] - combiner r:[b1, b2, a1, a2], l:[b1, b2, a1, a2]
[b1, b2, a1, a2, b1, b2, a1, a2]
第三个结果?
20:53:27.321 [main] - accumulator r:[], l:[b1, b2]
20:53:27.321 [ForkJoinPool.commonPool-worker-3] - accumulator r:[], l:[a1, a2]
20:53:27.324 [ForkJoinPool.commonPool-worker-3] - combiner r:[a1, a2], l:[a1, a2]
[a1, a2, a1, a2]
回答
使用并行流时,永远不要修改 reduce 函数内的累加器值,而是返回一个新实例。在您的代码中,两个线程同时修改累加器的同一个实例,因此结果确实是不确定的。
要查看发生了什么,请在之后添加日志记录r.addAll(l):
(r, l) -> {
log.info("accumulator before r:{}, l:{}", r, l);
r.addAll(l);
log.info("accumulator after r:{}", r);
return r;
}
以下是日志:
accumulator before r:[], l:[b1, b2]
accumulator before r:[], l:[a1, a2]
accumulator after r:[b1, b2]
accumulator after r:[b1, b2, a1, a2]
combiner r:[b1, b2, a1, a2], l:[b1, b2, a1, a2]
accumulator before r:[], l:[b1, b2]
accumulator before r:[], l:[a1, a2]
accumulator after r:[b1, b2]
accumulator after r:[b1, b2, a1, a2]
combiner r:[b1, b2, a1, a2], l:[b1, b2, a1, a2]
- 线程1进入reduce函数,累加器为[]
- 线程2进入reduce函数,累加器为[]
- 线程 1 将 [b1,b2] 添加到累加器,使其变为 [b1,b2]
- 线程 2 将 [a1,a2] 添加到累加器,使其变为 [b1,b2,a1,a2]
- 组合器将累加器的相同实例组合两次:[b1, b2, a1, a2, b1, b2, a1, a2]
正确的实现如下: