在探索新API akka.actor.typed,中的分布式发布订阅时,我们看到分布式发布订阅是通过使用一个参与者akka.actor.typed.pubsub.Topic.表示每个发布子主题来实现的。为了模拟同样的情况,创建了一个本地akka集群,其中3个种子节点,其中一个充当发布者,另外两个是使用者。在使用时,我们看到两个订户都在接收相同的数据,即去重复不会发生。请查找发行者的示例代码:
object PubApp {
def pubsubImplementor() = {
val pubsubGurdian = Behaviors.setup[Unit] { (context) =>
val publisher = context.spawn(PublisherEnd(), "publisher")
Thread.sleep(20000) //Waiting for subscriber to go up
(1 to 20).map { i =>
Thread.sleep(1000)
publisher ! Data(i)
}
Behaviors.same
}
implicit val config = ConfigFactory.parseString(
s"""
|akka.remote.artery.canonical.port = 2551
|""".stripMargin
).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
ActorSystem(pubsubGurdian, "SamikCluster", config)
}
def main(args: Array[String]): Unit = {
pubsubImplementor()
}
}object PublisherEnd {
case class Data(i : Int) extends Command
def apply():Behavior[Command] = Behaviors.setup{(context) =>
val topic = context.spawn(Topic[Command]("pub-sub"), "publisherActor")
Behaviors.receive{(context,message) =>
message match {
case m@Data(i) =>
context.log.info(s"message: ${m.i} sent to Sink")
topic ! Topic.Publish(m)
Behaviors.same
}
}
}
}请查找订阅者/消费者的示例代码:
object SubApp1 {
def subImplementer() = {
val subGurdian = Behaviors.setup[Unit] { (context) =>
val subscriber1 = context.spawn(SubsriberEnd(), "subscriber1")
Behaviors.same
}
implicit val config = ConfigFactory.parseString(
s"""
|akka.remote.artery.canonical.port = 2552
|""".stripMargin
).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
ActorSystem(subGurdian, "SamikCluster", config)
}
def main(args: Array[String]): Unit = {
subImplementer()
}
}
object SubApp2 {
def subImplementer() = {
val subGurdian = Behaviors.setup[Unit] { (context) =>
val subscriber2 = context.spawn(SubsriberEnd(), "subscriber2")
Behaviors.same
}
implicit val config = ConfigFactory.parseString(
s"""
|akka.remote.artery.canonical.port = 2553
|""".stripMargin
).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
ActorSystem(subGurdian, "SamikCluster", config)
}
def main(args: Array[String]): Unit = {
subImplementer()
}
}object SubsriberEnd {
def apply(): Behavior[Command] = Behaviors.setup[Command] { (context) =>
val topic = context.spawn(Topic[Command]("pub-sub"), "subscriberActor")
topic ! Topic.Subscribe(context.self)
Behaviors.receive { (context, message) =>
message match {
case m@Data(i) =>
context.log.info(s"[${context.self.path}] received message: ${m.i} in Sink")
Behaviors.same
}
}
}
}我从publisher获得的输出:
09:40:07.566 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 1 sent to Sink
09:40:08.567 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 2 sent to Sink
09:40:09.570 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 3 sent to Sink
09:40:10.570 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 4 sent to Sink
09:40:11.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 5 sent to Sink
09:40:12.576 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 6 sent to Sink
09:40:13.581 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 7 sent to Sink
09:40:14.581 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 8 sent to Sink
09:40:15.582 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 9 sent to Sink
09:40:16.582 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 10 sent to Sink
09:40:17.583 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 11 sent to Sink
09:40:18.585 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 12 sent to Sink
09:40:19.586 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 13 sent to Sink
09:40:20.590 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 14 sent to Sink
09:40:21.591 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 15 sent to Sink
09:40:22.591 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 16 sent to Sink
09:40:23.596 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 17 sent to Sink
09:40:24.596 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 18 sent to Sink
09:40:25.599 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 19 sent to Sink
09:40:26.602 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 20 sent to Sink以下是消费者的输出: Consumer1,即SubApp1
09:40:07.734 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 1 in Sink
09:40:08.570 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 2 in Sink
09:40:09.572 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 3 in Sink
09:40:10.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 4 in Sink
09:40:11.574 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 5 in Sink
09:40:12.578 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 6 in Sink
09:40:13.582 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 7 in Sink
09:40:14.584 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 8 in Sink
09:40:15.585 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 9 in Sink
09:40:16.583 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 10 in Sink
09:40:17.584 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 11 in Sink
09:40:18.587 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 12 in Sink
09:40:19.588 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 13 in Sink
09:40:20.592 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 14 in Sink
09:40:21.593 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 15 in Sink
09:40:22.593 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 16 in Sink
09:40:23.598 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 17 in Sink
09:40:24.598 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 18 in Sink
09:40:25.600 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 19 in Sink
09:40:26.604 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 20 in SinkConsumer2即SubApp2
09:40:07.734 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 1 in Sink
09:40:08.570 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 2 in Sink
09:40:09.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 3 in Sink
09:40:10.572 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 4 in Sink
09:40:11.574 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 5 in Sink
09:40:12.578 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 6 in Sink
09:40:13.582 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 7 in Sink
09:40:14.584 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 8 in Sink
09:40:15.585 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 9 in Sink
09:40:16.585 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 10 in Sink
09:40:17.584 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 11 in Sink
09:40:18.587 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 12 in Sink
09:40:19.588 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 13 in Sink
09:40:20.592 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 14 in Sink
09:40:21.593 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 15 in Sink
09:40:22.594 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 16 in Sink
09:40:23.598 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 17 in Sink
09:40:24.599 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 18 in Sink
09:40:25.600 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 19 in Sink
09:40:26.604 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 20 in Sink同样的消息被发布在我们不想要的两个订阅者中(就像广播一样)。请您告诉我,我们如何才能实现重复,使同一条消息不应在不同的订户中发布两次。
发布于 2022-03-25 23:59:27
分布式Pub Sub是为一对多的用例设计的:它将尝试在发布时(大约)将发布到主题的消息传递给该主题的每个订阅者。它不是设计成去重复的。
使用分布式Pub所能得到的最接近的是一个逻辑主题除了主主题之外还有多个子主题。发布者发布到主主题,参与者(可能是集群单例)订阅该主题,并将每条消息重新发布到其中一个子主题(或者生产者可以生成到子主题)。
请注意,分布式Pub Sub并不能对通过它发送的消息的处理频率提供任何有用的保证:它最多为每个订阅者发送一次消息,但每条消息都可能由任意多个订阅者发送和处理,因此它是至少零次的保证。
https://stackoverflow.com/questions/71567181
复制相似问题