我的阿克卡流继续学习。我想将我的akka应用程序与akka-集群和DistributedPubSubMediator集成。
添加对发布的支持是相当直截了当的,但我的订阅部分有问题。
作为参考,订阅服务器在型钢样品中如下所示
class ChatClient(name: String) extends Actor {
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe("some topic", self)
def receive = {
case ChatClient.Message(from, text) =>
...process message...
}
}我的问题是,我应该如何将这个角色集成到我的流中,以及如何确保在没有流背压的情况下得到发布消息?
我正在尝试完成一个pubsub模型,其中一个流可能发布一个消息,而另一个流将使用它(如果订阅的话)。
发布于 2016-02-03 03:07:01
您可能希望让您的演员扩展ActorPublisher。然后您可以从它创建一个源,并将其集成到您的流中。
参见ActorPublisher上的文档:http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-integrations.html
发布于 2017-11-16 14:35:53
其他的答案已经过时了:他们建议使用自2.5.0版本以来就被废弃的ActorPublisher。
对于那些对当前方法感兴趣的人,Colin在他的博客中写了一个关于整合Akka Streams和Akka演员的优秀系列文章。在这个系列的过程中,Breck充实了一个从Akka流和普通参与者开始的系统,然后结合了Akka集群和Akka持久性。本系列的第一篇文章是这里 (分布式流处理部分在第3部分中)。
https://stackoverflow.com/questions/35167562
复制相似问题