如何从多个线程调用Sinks.Many<T>.tryEmitNext?

我的脑袋围绕着 Flux Sinks,无法理解更高层次的图片。使用时Sinks.Many<T> tryEmitNext,该函数会告诉我是否存在争用以及在失败的情况下我该怎么办(FailFast/Handler)。

但是有没有一个简单的结构可以让我从多个线程安全地发出元素。例如,与其让用户知道存在争用,我应该再试一次,不如将元素添加到队列(mpmc、mpsc 等)中,并且仅在队列已满时才通知。

现在我可以自己添加一个队列来缓解这个问题,但这似乎是一个常见的用例。我想我在这里遗漏了一点。

回答

我遇到了同样的问题,从支持多线程安全发射的处理器迁移。我使用这个自定义 EmitFailureHandler 来执行 EmitFailureHandler 文档建议的繁忙循环。

public static EmitFailureHandler etryOnNonSerializedElse(EmitFailureHandler fallback){
    return (signalType, emitResult) -> {
        if (emitResult == EmitResult.FAIL_NON_SERIALIZED) {
            LockSupport.parkNanos(10);
            return true;
        } else
            return fallback.onEmitFailure(signalType, emitResult);
    };
}

关于 3.4.0 实现有各种令人困惑的方面

  • 这意味着除非使用 Unsafe 变体,否则接收器支持序列化发射,但实际上所有序列化版本都会在并发发射的情况下快速失败。
  • Flux.Create 提供的 Sink 确实支持线程安全发射。

我希望图书馆在某个时候会提供一个可靠的工程替代方案。


以上是如何从多个线程调用Sinks.Many&lt;T&gt;.tryEmitNext?的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>