自SpringCloudStream3.1版本起已弃用EnableBinding、Output、Input

从 3.1 版开始,不推荐使用用于处理队列的主要 API。在课堂评论中它说:

从 3.1 开始弃用,支持函数式编程模型

我在网上搜索了很多解决方案,但没有找到关于我应该如何迁移的可靠 E2E 解释。

寻找以下示例:

  1. 从队列中读取
  2. 写入队列

如果有几种方法可以做到这一点(正如我在网上看到的那样),我很乐意为每个选项提供解释和典型用例。

回答

  1. 我假设您已经熟悉主要概念,并将重点介绍迁移。
  2. 我使用 kotlin 作为演示代码,以减少冗长

首先,一些可能有帮助的参考资料:

  • 这是最初的相关文档: 链接
  • 这是对新功能格式中命名方案的解释:链接
  • 这是一些更高级的场景的更详细的解释:链接

TL; 博士

相反,基于注解的配置工作,春天现在使用检测的豆Consumer/ Function/Supplier来定义数据流为您服务。

输入/消费者

而在你的代码看起来像这样之前:

interface BindableGradesChannel {
    @Input
    fun gradesChannel(): SubscribableChannel

    companion object {
        const val INPUT = "gradesChannel"
    }
}

用法类似于:

@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
    private val log = LoggerFactory.getLogger(GradesListener::class.java)
    
    @StreamListener(BindableScoresChannel.INPUT)
    fun listen(grade: Grade) {
        log.info("Received $grade")
        // do something
    }
}

现在整个定义是无关紧要的,可以这样做:

@Service
class GradesListener {
    private val log = LoggerFactory.getLogger(GradesListener::class.java)

    @Bean
    fun gradesChannel(): Consumer<Grade> {
        return Consumer { listen(grade = it) }
    }
    
    fun listen(grade: Grade) {
        log.info("Received $grade")
        // do something
    }
}

注意Consumerbean如何替换@StreamListener@Input

关于配置,如果之前为了配置你有一个application.yml看起来像这样:

spring:
  cloud:
    stream:
      bindings:
        gradesChannel:
          destination: GradesExchange
          group: grades-updates
          consumer:
            concurrency: 10
            max-attempts: 3

现在应该是这样的:

spring:
  cloud:
    stream:
      bindings:
        gradesChannel-in-0:
          destination: GradesExchange
          group: grades-updates
          consumer:
            concurrency: 10
            max-attempts: 3

注意如何gradesChannel被替换为gradesChannel-in-0- 要了解完整的命名约定,请参阅顶部的命名约定链接。

一些细节:

  1. 如果您的应用程序中有多个这样的 bean,则需要定义该spring.cloud.function.definition属性。
  2. 您可以选择为您的频道提供自定义名称,因此如果您想继续使用,gradesChannel您可以spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel在配置中的任何地方进行设置和使用gradesChannel

输出/供应商

这里的概念是相似的,您将配置和代码替换为如下所示:

interface BindableStudentsChannel {
    @Output
    fun studentsChannel(): MessageChannel
}

@Service
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
    fun publish(message: Message<Student>) {
        studentsChannel.studentsChannel().send(message)
    }
}

现在可以替换为:

@Service
class StudentsQueueWriter {
    @Bean
    fun studentsChannel(): Supplier<Student> {
        return Supplier { Student("Adam") }
    }
}

正如您所看到的,我们有很大的不同——它是什么时候调用的,由谁调用?

之前我们可以手动触发它,但现在它是由 spring 每秒触发的(默认情况下)。这适用于需要每秒发布传感器数据的用例,但是当您想在事件上发送消息时这并不好。除了Function出于任何原因使用之外,spring 还提供了两种选择:

StreamBridge -链接

使用StreamBridge你可以。像这样明确定义目标:

@Service
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
    fun publish(message: Message<Student>) {
        streamBridge.send("studentsChannel-out-0", message)
    }
}

这样您就不会将目标通道定义为 bean,但您仍然可以发送消息。缺点是您的类中有一些显式配置。

反应器 API -链接

另一种方法是使用某种反应机制,例如EmitterProcessor, 并返回它。使用它,您的代码将类似于:

@Service
class StudentsQueueWriter {
    val students: EmitterProcessor<Student> = EmitterProcessor.create()
    @Bean
    fun studentsChannel(): Supplier<Flux<Student>> {
        return Supplier { students }
    }
}

用法可能类似于:

class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
    fun newStudent() {
        studentsQueueWriter.students.onNext(Student("Adam"))
    }
}

  • Also, please see the [relevant section in the doc](https://docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-producing-consuming-messages) as well as these two blog posts - https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified and https://spring.io/blog/2019/10/17/spring-cloud-stream-functional-and-reactive

以上是自SpringCloudStream3.1版本起已弃用EnableBinding、Output、Input的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>