文章目录 集群中的分布式发布订阅 依赖 简介 发布 主题组 发送 DistributedPubSub 扩展 传递保证 集群中的分布式发布订阅 依赖 为了使用分布式发布订阅(Distributed Publish 中介程序可以以DistributedPubSub扩展启动,也可以作为普通的 Actor 启动。 注册表最终是一致的,即更改在其他节点上不立即可见,但通常在几秒钟后将其完全复制到所有其他节点。 DistributedPubSub 扩展 在上面的示例中,使用akka.cluster.pubsub.DistributedPubSub扩展启动和访问中介。 可以使用以下属性配置DistributedPubSub扩展: # Settings for the DistributedPubSub extension akka.cluster.pub-sub { akka.extensions = ["akka.cluster.pubsub.DistributedPubSub"] 传递保证 与 Akka 中的「 Message Delivery Reliability
pub/sub模式把reader放在订阅subscriber端,如下: //写端 import DistributedPubSubMediator.Publish val mediator = DistributedPubSub Publish(persistentId, event,sendOneMessageToEachGroup = true) } } //读端 val mediator = DistributedPubSub
extends Actor { import DistributedPubSubMediator.Publish // activate the extension val mediator = DistributedPubSub String, ref: ActorRef) = new Subscribe(topic, ref) } 订阅操作即向本地Mediator发送Subscribe消息: val mediator = DistributedPubSub 同样:Send和Put都是消息类型,Put代表订阅: val mediator = DistributedPubSub(context.system).mediator // register to extends Actor { import DistributedPubSubMediator.Send // activate the extension val mediator = DistributedPubSub extends Actor { import DistributedPubSubMediator.Send // activate the extension val mediator = DistributedPubSub
channelGroup: ChannelGroup = DefaultChannelGroup(GlobalEventExecutor.INSTANCE) private val mediator = DistributedPubSub.get
注意,「ClusterClientReceptionist」使用「DistributedPubSub」扩展,这在「集群中的分布式发布订阅」中进行了描述。
假设它们会提供不同的叫声作为服务吧: class Cat extends Actor with ActorLogging { //使用pub/sub方式设置 val mediator = DistributedPubSub MIAOM ...******") } } class Dog extends Actor with ActorLogging { //使用pub/sub方式设置 val mediator = DistributedPubSub ) system } } class Cat extends Actor with ActorLogging { //使用pub/sub方式设置 val mediator = DistributedPubSub MIAOM ...******") } } class Dog extends Actor with ActorLogging { //使用pub/sub方式设置 val mediator = DistributedPubSub