ParallelStream上的CompletableFuture被批处理并且运行速度比顺序流慢?
方法一
通常,非常快,并且效果很好。
public static int loops = 500;
private static ExecutorService customPool = Executors.newFixedThreadPool(loops);
.
.
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
.map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number), customPool))
.collect(Collectors.toList()).stream() // collect first, else will be sequential
.map(CompletableFuture::join)
.mapToLong(Long::longValue)
.summaryStatistics();
log.info("cf completed in :: {}, summaryStats :: {} ", Duration.between(start, Instant.now()).toMillis(), stats);
// ... cf completed in :: 1054, summaryStats :: LongSummaryStatistics{count=500, sum=504008, min=1000, average=1008.016000, max=1017}
我明白如果我不先收集流,那么由于懒惰的性质,流会一个一个地弹出CompletableFutures,并同步运行。所以,作为一个实验:
方法二
删除中间收集步骤,但也要使流平行!:
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
.parallel()
.map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number), customPool))
.map(CompletableFuture::join) // direct join
.mapToLong(Long::longValue).summaryStatistics();
log.info("cfps_directJoin completed in :: {}, summaryStats :: {} ", Duration.between(start, Instant.now()).toMillis(), stats);
// ... cfps_directJoin completed in :: 8098, summaryStats :: LongSummaryStatistics{count=500, sum=505002, min=1000, average=1010.004000, max=1015}
概括:
- 方法 1 :: 1 秒
- 方法 2 :: 8 秒
我观察到的一个模式:
- 并行流方法一次“批处理”60 个调用,因此有 500 个循环,500/60 ~ 8 个批处理,每个需要 1 秒,因此总共 8 个
- 所以,当我将循环计数减少到 300 时,有 300/60 = 5 个批次,实际完成需要 5 秒。
所以,问题是:
为什么在并行+直接收集方法中会有这种批处理调用?
为了完成,这是我的虚拟网络调用方法:
public static Long slowNetworkCall(Long i) {
Instant start = Instant.now();
log.info(" {} going to sleep..", i);
try {
TimeUnit.MILLISECONDS.sleep(1000); // 1 second
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(" {} woke up..", i);
return Duration.between(start, Instant.now()).toMillis();
}
THE END
二维码