首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当一个节点上有多个“订阅者”时,接待员不会执行重复。

当一个节点上有多个“订阅者”时,接待员不会执行重复。
EN

Stack Overflow用户
提问于 2022-03-22 05:11:39
回答 1查看 63关注 0票数 1

在探索新API akka.actor.typed,中的分布式发布订阅时,我们看到分布式发布订阅是通过使用一个参与者akka.actor.typed.pubsub.Topic.表示每个发布子主题来实现的。为了模拟同样的情况,创建了一个本地akka集群,其中3个种子节点,其中一个充当发布者,另外两个是使用者。在使用时,我们看到两个订户都在接收相同的数据,即去重复不会发生。请查找发行者的示例代码:

代码语言:javascript
复制
object PubApp {

  def pubsubImplementor() = {

    val pubsubGurdian = Behaviors.setup[Unit] { (context) =>
      val publisher = context.spawn(PublisherEnd(), "publisher")

      Thread.sleep(20000) //Waiting for subscriber to go up
      (1 to 20).map { i =>
        Thread.sleep(1000)
        publisher ! Data(i)
      }
      Behaviors.same
    }

    implicit val config = ConfigFactory.parseString(
      s"""
         |akka.remote.artery.canonical.port = 2551
         |""".stripMargin
    ).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
    ActorSystem(pubsubGurdian, "SamikCluster", config)
  }


  def main(args: Array[String]): Unit = {
    pubsubImplementor()
  }

}
代码语言:javascript
复制
object PublisherEnd {

  case class Data(i : Int) extends Command
  
  def apply():Behavior[Command] = Behaviors.setup{(context) =>
    
    val topic = context.spawn(Topic[Command]("pub-sub"), "publisherActor")

    Behaviors.receive{(context,message) =>
      message match {
       
        case m@Data(i) =>
          context.log.info(s"message: ${m.i} sent to Sink")
          topic ! Topic.Publish(m)
          Behaviors.same
       
      }

    }
  }
}

请查找订阅者/消费者的示例代码:

代码语言:javascript
复制
object SubApp1 {

  def subImplementer() = {
    val subGurdian = Behaviors.setup[Unit] { (context) =>

      val subscriber1 = context.spawn(SubsriberEnd(), "subscriber1")
      Behaviors.same
    }

    implicit val config = ConfigFactory.parseString(
      s"""
         |akka.remote.artery.canonical.port = 2552
         |""".stripMargin
    ).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
    ActorSystem(subGurdian, "SamikCluster", config)
  }

  def main(args: Array[String]): Unit = {
    subImplementer()
  }
}

object SubApp2 {

  def subImplementer() = {
    val subGurdian = Behaviors.setup[Unit] { (context) =>

      val subscriber2 = context.spawn(SubsriberEnd(), "subscriber2")
      Behaviors.same
    }

    implicit val config = ConfigFactory.parseString(
      s"""
         |akka.remote.artery.canonical.port = 2553
         |""".stripMargin
    ).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
    ActorSystem(subGurdian, "SamikCluster", config)
  }

  def main(args: Array[String]): Unit = {
    subImplementer()
  }
}
代码语言:javascript
复制
object SubsriberEnd {


  def apply(): Behavior[Command] = Behaviors.setup[Command] { (context) =>

    val topic = context.spawn(Topic[Command]("pub-sub"), "subscriberActor")
    topic ! Topic.Subscribe(context.self)


    Behaviors.receive { (context, message) =>
      message match {
        case m@Data(i) =>
          context.log.info(s"[${context.self.path}] received message: ${m.i} in Sink")
          Behaviors.same
      }

    }
  }

}

我从publisher获得的输出:

代码语言:javascript
复制
09:40:07.566 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 1 sent to Sink
09:40:08.567 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 2 sent to Sink
09:40:09.570 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 3 sent to Sink
09:40:10.570 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 4 sent to Sink
09:40:11.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 5 sent to Sink
09:40:12.576 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 6 sent to Sink
09:40:13.581 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 7 sent to Sink
09:40:14.581 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 8 sent to Sink
09:40:15.582 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 9 sent to Sink
09:40:16.582 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 10 sent to Sink
09:40:17.583 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 11 sent to Sink
09:40:18.585 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 12 sent to Sink
09:40:19.586 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 13 sent to Sink
09:40:20.590 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 14 sent to Sink
09:40:21.591 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 15 sent to Sink
09:40:22.591 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 16 sent to Sink
09:40:23.596 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 17 sent to Sink
09:40:24.596 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 18 sent to Sink
09:40:25.599 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 19 sent to Sink
09:40:26.602 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 20 sent to Sink

以下是消费者的输出: Consumer1,即SubApp1

代码语言:javascript
复制
09:40:07.734 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 1 in Sink
09:40:08.570 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 2 in Sink
09:40:09.572 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 3 in Sink
09:40:10.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 4 in Sink
09:40:11.574 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 5 in Sink
09:40:12.578 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 6 in Sink
09:40:13.582 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 7 in Sink
09:40:14.584 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 8 in Sink
09:40:15.585 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 9 in Sink
09:40:16.583 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 10 in Sink
09:40:17.584 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 11 in Sink
09:40:18.587 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 12 in Sink
09:40:19.588 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 13 in Sink
09:40:20.592 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 14 in Sink
09:40:21.593 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 15 in Sink
09:40:22.593 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 16 in Sink
09:40:23.598 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 17 in Sink
09:40:24.598 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 18 in Sink
09:40:25.600 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 19 in Sink
09:40:26.604 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 20 in Sink

Consumer2即SubApp2

代码语言:javascript
复制
09:40:07.734 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 1 in Sink
09:40:08.570 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 2 in Sink
09:40:09.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 3 in Sink
09:40:10.572 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 4 in Sink
09:40:11.574 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 5 in Sink
09:40:12.578 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 6 in Sink
09:40:13.582 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 7 in Sink
09:40:14.584 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 8 in Sink
09:40:15.585 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 9 in Sink
09:40:16.585 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 10 in Sink
09:40:17.584 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 11 in Sink
09:40:18.587 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 12 in Sink
09:40:19.588 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 13 in Sink
09:40:20.592 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 14 in Sink
09:40:21.593 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 15 in Sink
09:40:22.594 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 16 in Sink
09:40:23.598 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 17 in Sink
09:40:24.599 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 18 in Sink
09:40:25.600 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 19 in Sink
09:40:26.604 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 20 in Sink

同样的消息被发布在我们不想要的两个订阅者中(就像广播一样)。请您告诉我,我们如何才能实现重复,使同一条消息不应在不同的订户中发布两次。

EN

回答 1

Stack Overflow用户

发布于 2022-03-25 23:59:27

分布式Pub Sub是为一对多的用例设计的:它将尝试在发布时(大约)将发布到主题的消息传递给该主题的每个订阅者。它不是设计成去重复的。

使用分布式Pub所能得到的最接近的是一个逻辑主题除了主主题之外还有多个子主题。发布者发布到主主题,参与者(可能是集群单例)订阅该主题,并将每条消息重新发布到其中一个子主题(或者生产者可以生成到子主题)。

请注意,分布式Pub Sub并不能对通过它发送的消息的处理频率提供任何有用的保证:它最多为每个订阅者发送一次消息,但每条消息都可能由任意多个订阅者发送和处理,因此它是至少零次的保证。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71567181

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档