你有测试来显示反应器map()和flatMap()之间的差异吗?

我仍在尝试了解 reactor map() 和 flatMap() 方法之间的区别。首先我查看了 API,但它并没有真正的帮助,它让我更加困惑。然后我用谷歌搜索了很多,但似乎没有人有一个例子来使差异变得可以理解,如果有任何差异的话。

因此,我尝试编写两个测试来查看每种方法的不同行为。但不幸的是它并没有像我希望的那样工作......

第一个测试方法是测试反应式 flatMap() 方法:

@Test
void fluxFlatMapTest() {
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .window(2)
            .flatMap(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
            .doOnNext(System.out::println)
            .subscribe();
}

输出符合预期,可解释,如下所示:

9 - parallel-2
1 - parallel-1
4 - parallel-1
25 - parallel-3
36 - parallel-3
49 - parallel-4
64 - parallel-4
81 - parallel-5
100 - parallel-5
16 - parallel-2

第二种方法应该测试 map() 方法的输出,以与 flatMap() 方法的上述结果进行比较。

@Test
void fluxMapTest() {
    final int start = 1;
    final int stop = 100;
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .window(2)
            .map(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
            .doOnNext(System.out::println)
            .subscribe();
}

这个测试方法有输出,我完全没想到,看起来像这样:

FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn

有一个看起来像这样的小助手方法:

private String processNumber(Integer x) {
    String squaredValueAsString = String.valueOf(x * x);
    return squaredValueAsString.concat(" - ").concat(Thread.currentThread().getName());
}

这里没什么特别的。

我将 Spring Boot 2.3.4 与 Java 11 和 Spring 的 reactor 实现一起使用。

您是否有一个很好的解释示例,或者您是否知道如何更改上述测试以使它们有意义?那么请帮我解决这个问题。非常感谢!

回答

Reactor 是 Webflux 中的底层库,它包含一个叫做 the 的东西event loop,我相信它是基于一个叫做LMAX Architecture 的架构。

这意味着event loop是单线程事件处理器。事件循环之前的所有内容都可以是多线程的,但事件本身由单个线程处理。The event loop.

常规的 Spring Boot 应用程序通常使用服务器 tomcat 或 undertow 运行,而 webflux 默认由事件驱动服务器 Netty 运行,Netty 反过来使用它event loop来为我们处理事件。

所以现在我们了解了一切的背后是什么,我们可以开始谈论mapflatMap

地图

如果我们查看 api,我们可以看到以下图像:

和 api 文本说:

通过对每个项目应用同步函数来转换此 Flux 发出的项目。

这是不言自明的。我们有一个Flux项目,每次 map 要求处理一个项目时,它不会要求另一个项目,直到它完成第一个处理。因此是同步的

图像显示,绿色圆圈需要转换为绿色方块,直到我们可以要求将黄色圆圈转换为黄色方块......等等。

这是一个代码示例:

Flux.just("a", "b", "c")
    .map(value -> value.toUppercase())
    .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - main
B - main
C - main

每个都在主线程上运行,并同步处理。

平面图

如果我们查看 api,我们可以看到以下图像:

和文字说:

将此 Flux 发出的元素异步转换为 Publishers,然后通过合并将这些内部发布者扁平化为单个 Flux,允许它们交错。

它基本上使用三个步骤来做到这一点:

  • 内部和订阅的生成:这个运营商急切地订阅它的内部。
  • 扁平值的排序:此运算符不一定保留原始排序,因为内部元素在到达时被扁平化。
  • 交错:这个运算符让来自不同内部的值交错(类似于合并内部序列)。

那么这是什么意思?嗯,它基本上意味着:

  1. 它将获取通量中的每个项目,并将其转换为个人Mono(出版商),每个项目中都有一个项目。

  2. 在处理项目时对项目进行排序,flatMap确实会NOT保留顺序,因为项目可以在事件循环中以不同的时间处理。

  3. 将所有处理过的项目合并回一个Flux以供进一步处理。

这是一个代码示例:

Flux.just("a", "b", "c")
        .flatMap(value -> Mono.just(value.toUpperCase()))
         .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - main
B - main
C - main

等待 flatMap 打印与地图相同的东西!

好吧,这一切又回到了我们之前谈到的线程模型。实际上只有一个称为事件循环的线程来处理所有事件。

Reactor 是并发不可知的,这意味着任何人worker都可以安排由event loop.

所以什么是worker井 aworker是 ascheduler可以产生的东西。一件重要的事情是,worker 不必是一个线程,它可以是,但不一定是。

在上面的代码案例中,主线程订阅了我们的通量,这意味着主线程将为我们处理它并安排事件循环处理的工作。

在服务器环境中,这不一定是这种情况。这里要理解的重要一点是,workers如果需要,reactor 可以随时切换(也就是可能的线程)

在我上面的代码示例中,只有一个主线程,因此不需要在多个线程上运行,也不需要并行执行。

如果我想强制它,我可以使用不同的调度程序之一,这些调度程序都有其用途。在 Netty 中,服务器将启动与您机器上的内核数量相同的事件循环线程,因此如果需要,它可以在重负载下自由切换工作线程和内核,以最大限度地利用所有事件循环。

FlatMap 异步并不意味着并行,这意味着它将同时安排事件循环处理的所有事情,但它仍然只有一个线程执行任务。

并行执行

如果我真的想并行执行某些操作,例如可以将某些内容放在并行调度程序上。这意味着它将保证workers多个内核上的多个。但请记住,当您的程序运行时,有一个设置时间,这通常仅在您有大量计算内容而需要大量单核 CPU 能力时才有用。

代码示例:

Flux.just("a", "b", "c")
    .flatMap(value -> value -> Mono.just(value.toUpperCase()))
    .subscribeOn(Schedulers.parallel())
    .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - parallel-1
B - parallel-1
C - parallel-1

在这里,我们仍然只在一个线程上运行,因为这subscribeOn意味着当一个线程订阅时,Scheduler它将从调度程序池中选择一个线程,然后在整个执行过程中坚持下去。

如果我们想绝对感觉到需要在多个线程上强制执行,我们可以例如使用并行通量。

Flux.range(1, 10)
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

// Output
parallel-3 -> 2
parallel-2 -> 1
parallel-3 -> 4
parallel-2 -> 3
parallel-3 -> 6
parallel-2 -> 5
parallel-3 -> 8
parallel-2 -> 7
parallel-3 -> 10
parallel-2 -> 9

但请记住,这在大多数情况下是不必要的。有一个设置时间,这种类型的执行通常只有在您有很多 CPU 繁重的任务时才有用。否则使用默认的事件循环单线程在大多数情况下“可能”会更快。

处理大量 I/O 任务,通常更多地是关于编排,而不是原始 CPU 能力。


这里的大部分信息都是从FluxMonoapi 中获取的。

该反应器的文档是信息的惊人的,有趣的来源。

还有 Simon Baslé 的博客系列Flight of the flux也是一本精彩而有趣的读物。它也以Youtube 格式存在

这里和那里也有一些错误,我也做了一些假设,尤其是在 Reactor 的内部工作方面。但希望这至少能澄清一些想法。

如果有人觉得事情直接有问题,请随时进行编辑。


以上是你有测试来显示反应器map()和flatMap()之间的差异吗?的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>