你有测试来显示反应器map()和flatMap()之间的差异吗?
我仍在尝试了解 reactor map() 和 flatMap() 方法之间的区别。首先我查看了 API,但它并没有真正的帮助,它让我更加困惑。然后我用谷歌搜索了很多,但似乎没有人有一个例子来使差异变得可以理解,如果有任何差异的话。
因此,我尝试编写两个测试来查看每种方法的不同行为。但不幸的是它并没有像我希望的那样工作......
第一个测试方法是测试反应式 flatMap() 方法:
@Test
void fluxFlatMapTest() {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.window(2)
.flatMap(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
.doOnNext(System.out::println)
.subscribe();
}
输出符合预期,可解释,如下所示:
9 - parallel-2
1 - parallel-1
4 - parallel-1
25 - parallel-3
36 - parallel-3
49 - parallel-4
64 - parallel-4
81 - parallel-5
100 - parallel-5
16 - parallel-2
第二种方法应该测试 map() 方法的输出,以与 flatMap() 方法的上述结果进行比较。
@Test
void fluxMapTest() {
final int start = 1;
final int stop = 100;
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.window(2)
.map(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
.doOnNext(System.out::println)
.subscribe();
}
这个测试方法有输出,我完全没想到,看起来像这样:
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
有一个看起来像这样的小助手方法:
private String processNumber(Integer x) {
String squaredValueAsString = String.valueOf(x * x);
return squaredValueAsString.concat(" - ").concat(Thread.currentThread().getName());
}
这里没什么特别的。
我将 Spring Boot 2.3.4 与 Java 11 和 Spring 的 reactor 实现一起使用。
您是否有一个很好的解释示例,或者您是否知道如何更改上述测试以使它们有意义?那么请帮我解决这个问题。非常感谢!
回答
Reactor 是 Webflux 中的底层库,它包含一个叫做 the 的东西event loop,我相信它是基于一个叫做LMAX Architecture 的架构。
这意味着event loop是单线程事件处理器。事件循环之前的所有内容都可以是多线程的,但事件本身由单个线程处理。The event loop.
常规的 Spring Boot 应用程序通常使用服务器 tomcat 或 undertow 运行,而 webflux 默认由事件驱动服务器 Netty 运行,Netty 反过来使用它event loop来为我们处理事件。
所以现在我们了解了一切的背后是什么,我们可以开始谈论map和flatMap。
地图
如果我们查看 api,我们可以看到以下图像:
和 api 文本说:
通过对每个项目应用同步函数来转换此 Flux 发出的项目。
这是不言自明的。我们有一个Flux项目,每次 map 要求处理一个项目时,它不会要求另一个项目,直到它完成第一个处理。因此是同步的。
图像显示,绿色圆圈需要转换为绿色方块,直到我们可以要求将黄色圆圈转换为黄色方块......等等。
这是一个代码示例:
Flux.just("a", "b", "c")
.map(value -> value.toUppercase())
.subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));
// Output
A - main
B - main
C - main
每个都在主线程上运行,并同步处理。
平面图
如果我们查看 api,我们可以看到以下图像:
和文字说:
将此 Flux 发出的元素异步转换为 Publishers,然后通过合并将这些内部发布者扁平化为单个 Flux,允许它们交错。
它基本上使用三个步骤来做到这一点:
- 内部和订阅的生成:这个运营商急切地订阅它的内部。
- 扁平值的排序:此运算符不一定保留原始排序,因为内部元素在到达时被扁平化。
- 交错:这个运算符让来自不同内部的值交错(类似于合并内部序列)。
那么这是什么意思?嗯,它基本上意味着:
-
它将获取通量中的每个项目,并将其转换为个人
Mono(出版商),每个项目中都有一个项目。 -
在处理项目时对项目进行排序,
flatMap确实会NOT保留顺序,因为项目可以在事件循环中以不同的时间处理。 -
将所有处理过的项目合并回一个
Flux以供进一步处理。
这是一个代码示例:
Flux.just("a", "b", "c")
.flatMap(value -> Mono.just(value.toUpperCase()))
.subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));
// Output
A - main
B - main
C - main
等待 flatMap 打印与地图相同的东西!
好吧,这一切又回到了我们之前谈到的线程模型。实际上只有一个称为事件循环的线程来处理所有事件。
Reactor 是并发不可知的,这意味着任何人worker都可以安排由event loop.
所以什么是worker井 aworker是 ascheduler可以产生的东西。一件重要的事情是,worker 不必是一个线程,它可以是,但不一定是。
在上面的代码案例中,主线程订阅了我们的通量,这意味着主线程将为我们处理它并安排事件循环处理的工作。
在服务器环境中,这不一定是这种情况。这里要理解的重要一点是,workers如果需要,reactor 可以随时切换(也就是可能的线程)。
在我上面的代码示例中,只有一个主线程,因此不需要在多个线程上运行,也不需要并行执行。
如果我想强制它,我可以使用不同的调度程序之一,这些调度程序都有其用途。在 Netty 中,服务器将启动与您机器上的内核数量相同的事件循环线程,因此如果需要,它可以在重负载下自由切换工作线程和内核,以最大限度地利用所有事件循环。
FlatMap 异步并不意味着并行,这意味着它将同时安排事件循环处理的所有事情,但它仍然只有一个线程执行任务。
并行执行
如果我真的想并行执行某些操作,例如可以将某些内容放在并行调度程序上。这意味着它将保证workers多个内核上的多个。但请记住,当您的程序运行时,有一个设置时间,这通常仅在您有大量计算内容而需要大量单核 CPU 能力时才有用。
代码示例:
Flux.just("a", "b", "c")
.flatMap(value -> value -> Mono.just(value.toUpperCase()))
.subscribeOn(Schedulers.parallel())
.subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));
// Output
A - parallel-1
B - parallel-1
C - parallel-1
在这里,我们仍然只在一个线程上运行,因为这subscribeOn意味着当一个线程订阅时,Scheduler它将从调度程序池中选择一个线程,然后在整个执行过程中坚持下去。
如果我们想绝对感觉到需要在多个线程上强制执行,我们可以例如使用并行通量。
Flux.range(1, 10)
.parallel(2)
.runOn(Schedulers.parallel())
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
// Output
parallel-3 -> 2
parallel-2 -> 1
parallel-3 -> 4
parallel-2 -> 3
parallel-3 -> 6
parallel-2 -> 5
parallel-3 -> 8
parallel-2 -> 7
parallel-3 -> 10
parallel-2 -> 9
但请记住,这在大多数情况下是不必要的。有一个设置时间,这种类型的执行通常只有在您有很多 CPU 繁重的任务时才有用。否则使用默认的事件循环单线程在大多数情况下“可能”会更快。
处理大量 I/O 任务,通常更多地是关于编排,而不是原始 CPU 能力。
这里的大部分信息都是从Flux和Monoapi 中获取的。
该反应器的文档是信息的惊人的,有趣的来源。
还有 Simon Baslé 的博客系列Flight of the flux也是一本精彩而有趣的读物。它也以Youtube 格式存在
这里和那里也有一些错误,我也做了一些假设,尤其是在 Reactor 的内部工作方面。但希望这至少能澄清一些想法。
如果有人觉得事情直接有问题,请随时进行编辑。