自SpringCloudStream3.1版本起已弃用EnableBinding、Output、Input
从 3.1 版开始,不推荐使用用于处理队列的主要 API。在课堂评论中它说:
从 3.1 开始弃用,支持函数式编程模型
我在网上搜索了很多解决方案,但没有找到关于我应该如何迁移的可靠 E2E 解释。
寻找以下示例:
- 从队列中读取
- 写入队列
如果有几种方法可以做到这一点(正如我在网上看到的那样),我很乐意为每个选项提供解释和典型用例。
回答
- 我假设您已经熟悉主要概念,并将重点介绍迁移。
- 我使用 kotlin 作为演示代码,以减少冗长
首先,一些可能有帮助的参考资料:
- 这是最初的相关文档: 链接
- 这是对新功能格式中命名方案的解释:链接
- 这是一些更高级的场景的更详细的解释:链接
TL; 博士
相反,基于注解的配置工作,春天现在使用检测的豆Consumer/ Function/Supplier来定义数据流为您服务。
输入/消费者
而在你的代码看起来像这样之前:
interface BindableGradesChannel {
@Input
fun gradesChannel(): SubscribableChannel
companion object {
const val INPUT = "gradesChannel"
}
}
用法类似于:
@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@StreamListener(BindableScoresChannel.INPUT)
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
}
}
现在整个定义是无关紧要的,可以这样做:
@Service
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@Bean
fun gradesChannel(): Consumer<Grade> {
return Consumer { listen(grade = it) }
}
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
}
}
注意Consumerbean如何替换@StreamListener和@Input。
关于配置,如果之前为了配置你有一个application.yml看起来像这样:
spring:
cloud:
stream:
bindings:
gradesChannel:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
现在应该是这样的:
spring:
cloud:
stream:
bindings:
gradesChannel-in-0:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
注意如何gradesChannel被替换为gradesChannel-in-0- 要了解完整的命名约定,请参阅顶部的命名约定链接。
一些细节:
- 如果您的应用程序中有多个这样的 bean,则需要定义该
spring.cloud.function.definition属性。 - 您可以选择为您的频道提供自定义名称,因此如果您想继续使用,
gradesChannel您可以spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel在配置中的任何地方进行设置和使用gradesChannel。
输出/供应商
这里的概念是相似的,您将配置和代码替换为如下所示:
interface BindableStudentsChannel {
@Output
fun studentsChannel(): MessageChannel
}
和
@Service
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
fun publish(message: Message<Student>) {
studentsChannel.studentsChannel().send(message)
}
}
现在可以替换为:
@Service
class StudentsQueueWriter {
@Bean
fun studentsChannel(): Supplier<Student> {
return Supplier { Student("Adam") }
}
}
正如您所看到的,我们有很大的不同——它是什么时候调用的,由谁调用?
之前我们可以手动触发它,但现在它是由 spring 每秒触发的(默认情况下)。这适用于需要每秒发布传感器数据的用例,但是当您想在事件上发送消息时这并不好。除了Function出于任何原因使用之外,spring 还提供了两种选择:
StreamBridge -链接
使用StreamBridge你可以。像这样明确定义目标:
@Service
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
fun publish(message: Message<Student>) {
streamBridge.send("studentsChannel-out-0", message)
}
}
这样您就不会将目标通道定义为 bean,但您仍然可以发送消息。缺点是您的类中有一些显式配置。
反应器 API -链接
另一种方法是使用某种反应机制,例如EmitterProcessor, 并返回它。使用它,您的代码将类似于:
@Service
class StudentsQueueWriter {
val students: EmitterProcessor<Student> = EmitterProcessor.create()
@Bean
fun studentsChannel(): Supplier<Flux<Student>> {
return Supplier { students }
}
}
用法可能类似于:
class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
fun newStudent() {
studentsQueueWriter.students.onNext(Student("Adam"))
}
}
- Also, please see the [relevant section in the doc](https://docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-producing-consuming-messages) as well as these two blog posts - https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified and https://spring.io/blog/2019/10/17/spring-cloud-stream-functional-and-reactive