首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Pubsub使用带有Akka集群的主题组

Pubsub使用带有Akka集群的主题组
EN

Stack Overflow用户
提问于 2016-12-09 20:05:33
回答 1查看 2K关注 0票数 0

我正在尝试用Akka集群创建一个公共类型的应用程序。我正在阅读医生们关于pubsub的文章,并且正在尝试运行他们的例子。

我的基本工作流程如下:

运行订阅者(谁将成为主导者/领导者)。运行发布者(将向主题发送字符串)。所有订阅者都将收到此消息)。

这是我的密码:

Subscriber.scala

代码语言:javascript
复制
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}

class Subscriber extends Actor with ActorLogging {
  import DistributedPubSubMediator.{ Subscribe, SubscribeAck }
  val mediator = DistributedPubSub(context.system).mediator
  // subscribe to the topic named "content"
  mediator ! Subscribe("content", self)

  def receive = {
    case s: String =>
      log.info("Got {}", s)
    case SubscribeAck(Subscribe("content", None, `self`)) =>
      log.info("subscribing");
  }
}

object SubscriberMain extends App {
  val system = ActorSystem("ClusterSystem")
  val actor = system.actorOf(Props[Subscriber], name="Subscriber")
}

Publisher.scala

代码语言:javascript
复制
import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.typesafe.config.ConfigFactory

class Publisher extends Actor {
  import DistributedPubSubMediator.Publish
  // activate the extension
  val mediator = DistributedPubSub(context.system).mediator

  def receive = {
    case in: String => {
      val out = in.toUpperCase
      println(s"Received '$in', transformed to '$out'.")
      mediator ! Publish("content", out)
    }
  }
}

object PublisherMain extends App {
  val config = ConfigFactory.load()
  val system = ActorSystem("ClusterSystem", config.getConfig("PublishApp"))
  val actor = system.actorOf(Props[Publisher], name="Publisher")
  actor ! "something small"
}

application.conf

代码语言:javascript
复制
akka {
  loglevel = "INFO"
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
  }
  log-dead-letters = 0
  log-dead-letters-during-shutdown = off
}

PublishApp {
  akka {
    loglevel = "DEBUG"
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2552
      }
      log-sent-messages = on
      log-received-messages = on
    }
  }
}

我发现,当我运行发布服务器的主服务器时,当我试图发布到“内容”主题时,它会挂起,而订阅者不会收到消息。

发行者的日志包含以下内容:

[INFO] [12/09/2016 15:14:35.513] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/system/distributedPubSubMediator] Message [java.lang.String] from Actor[akka://ClusterSystem/user/Publisher#-1478463431] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#-1539813703] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

我一直在仔细研究文档,但似乎主题概念被提到更多的是一种事后思考。

为什么我的订阅者没有收到这条消息?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-12-10 11:48:54

出版商和订阅者实际上都很好。问题来自这样一个事实:您从分布式Pub示例开始。作为先决条件,您需要的当然是集群的设置,以便通过集群分发消息。

Main.scala

代码语言:javascript
复制
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import com.example.Publisher
import com.example.Subscriber

object Main {
  def main(args: Array[String]): Unit = {
    val systemName = "PubSub"
    val system1 = ActorSystem(systemName)
    val joinAddress = Cluster(system1).selfAddress
    Cluster(system1).join(joinAddress)
    val publisher = system1.actorOf(Props[Publisher], "publisher")

    Thread.sleep(5000)
    val system2 = ActorSystem(systemName)
    Cluster(system2).join(joinAddress)
    system2.actorOf(Props[Subscriber], "subscriber")

    Thread.sleep(5000)
    publisher ! "something"
  }
}

如果您现在运行Main.scala,您将看到以下内容:

  • 正在创建system1并加入集群(隐式创建)
  • 正在创建system2并将其加入集群。
  • 将消息发送给发行者,然后发行人将消息转发给中介,然后中介将消息分发到集群中。
  • 订阅者获取消息。

编辑:I将您的application.conf简化为如下所示:

代码语言:javascript
复制
akka {
  loglevel = "INFO"
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  log-dead-letters = 0
  log-dead-letters-during-shutdown = off
}

注意netty.tcp.port = 0 -这将保证您得到分配的随机端口,并且集群成员不会发生端口冲突。在我的输出中可以看到端口49759和49772。

输出:

代码语言:javascript
复制
Running Main 
[INFO] [12/10/2016 12:44:09.175] [main] [akka.remote.Remoting] Starting remoting
[INFO] [12/10/2016 12:44:09.439] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://PubSub@127.0.0.1:49759]
[INFO] [12/10/2016 12:44:09.451] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Starting up...
[INFO] [12/10/2016 12:44:09.537] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Started up successfully
[INFO] [12/10/2016 12:44:09.537] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [12/10/2016 12:44:09.546] [PubSub-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [12/10/2016 12:44:09.549] [PubSub-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Metrics collection has started successfully
[INFO] [12/10/2016 12:44:09.573] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - No seed-nodes configured, manual cluster join required
[INFO] [12/10/2016 12:44:09.594] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Node [akka.tcp://PubSub@127.0.0.1:49759] is JOINING, roles []
[INFO] [12/10/2016 12:44:09.606] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Leader is moving node [akka.tcp://PubSub@127.0.0.1:49759] to [Up]
[INFO] [12/10/2016 12:44:14.598] [main] [akka.remote.Remoting] Starting remoting
[INFO] [12/10/2016 12:44:14.616] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://PubSub@127.0.0.1:49772]
[INFO] [12/10/2016 12:44:14.617] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Starting up...
[INFO] [12/10/2016 12:44:14.626] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Started up successfully
[INFO] [12/10/2016 12:44:14.627] [PubSub-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [12/10/2016 12:44:14.627] [PubSub-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Metrics collection has started successfully
[INFO] [12/10/2016 12:44:14.633] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - No seed-nodes configured, manual cluster join required
[INFO] [12/10/2016 12:44:14.653] [PubSub-akka.actor.default-dispatcher-16] [akka.tcp://PubSub@127.0.0.1:49772/user/subscriber] subscribing
[INFO] [12/10/2016 12:44:14.844] [PubSub-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Node [akka.tcp://PubSub@127.0.0.1:49772] is JOINING, roles []
[INFO] [12/10/2016 12:44:14.920] [PubSub-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Welcome from [akka.tcp://PubSub@127.0.0.1:49759]
[INFO] [12/10/2016 12:44:15.580] [PubSub-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Leader is moving node [akka.tcp://PubSub@127.0.0.1:49772] to [Up]
Received 'something', transformed to 'SOMETHING'.
[INFO] [12/10/2016 12:44:19.673] [PubSub-akka.actor.default-dispatcher-4] [akka.tcp://PubSub@127.0.0.1:49772/user/subscriber] Got SOMETHING
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41068040

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档