在JavaAkka流中处理不断变化的源数据
启动了 2 个线程。dataListUpdateThread将数字 2 添加到 a List。processFlowThread将相同的值List相加并将总和列表打印到控制台。这是代码:
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import static java.lang.Thread.sleep;
public class SourceExample {
private final static ActorSystem system = ActorSystem.create("SourceExample");
private static void delayOneSecond() {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void printValue(CompletableFuture<Integer> integerCompletableFuture) {
try {
System.out.println("Sum is " + integerCompletableFuture.get().intValue());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
final List dataList = new ArrayList<Integer>();
final Thread dataListUpdateThread = new Thread(() -> {
while (true) {
dataList.add(2);
System.out.println(dataList);
delayOneSecond();
}
});
dataListUpdateThread.start();
final Thread processFlowThread = new Thread(() -> {
while (true) {
final Source<Integer, NotUsed> source = Source.from(dataList);
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.fold(0, (agg, next) -> agg + next);
final CompletionStage<Integer> sum = source.runWith(sink, system);
printValue(sum.toCompletableFuture());
delayOneSecond();
}
});
processFlowThread.start();
}
}
我试图创建最简单的示例来构建问题。dataListUpdateThread可以从 REST 服务或 Kafka 主题填充列表,而不仅仅是将值 2 添加到列表中。如果不使用 Java 线程,应该如何实现这种场景?换句话说,如何共享dataList到 Akka Stream 进行处理?
回答
改变传递给的集合Source.from只是巧合:如果集合耗尽,Source.from将完成流。这是因为它适用于有限的、严格评估的数据(用例基本上是:a)文档的简单示例和 b)在后台执行集合操作时想要限制资源消耗的情况(想想一个列表要将 HTTP 请求发送到的 URL))。
注意:自从 Java 7 天以来,我没有写过任何大量的 Java,所以我不提供 Java 代码,只是提供方法的概述。
正如之前的答案中提到的那样,这Source.queue可能是最好的选择(除了使用 Akka HTTP 或 Alpakka 连接器之类的东西)。在这种情况下,流的物化值是一个在流完成之前不会完成的未来,它Source.queue永远不会完成流(因为它无法知道它的引用是唯一的引用),引入KillSwitch和传播,通过viaMat和toMat会给你决定外流来完成数据流的能力。
, 的替代方法Source.queue是Source.actorRef,它可以让您发送一个独特的消息(akka.Done.done()在 Java API 中,这很常见)。该源具体化为ActorRef您可以向其tell发送消息的消息,并且这些消息(至少那些与流的类型匹配的消息)将可供流使用。
使用Source.queueand Source.actorRef,它通常对prematerialize他们有用:在您的示例中您还希望接收器的物化值的情况下的替代方法是大量使用Mat运算符来自定义物化值(在 Scala 中,可以使用元组至少简化多个物化值的组合,但是在 Java 中,一旦你超出了一对(就像你一样queue),我很确定你必须定义一个类来保存三个(队列,killswitch,未来为完成值)物化值)。
还值得注意的是,由于 Akka Streams 在后台运行在 actor 上(因此根据需要被调度到ActorSystem的线程上),几乎没有理由创建一个线程来运行流。