首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >错误:从演员到演员的信息没有被传递。[1]遇到的死信。分布式发布子在不工作的集群中工作。

错误:从演员到演员的信息没有被传递。[1]遇到的死信。分布式发布子在不工作的集群中工作。
EN

Stack Overflow用户
提问于 2017-01-25 10:44:20
回答 2查看 2.5K关注 0票数 1

我正试图在不同的集群系统中制造分布式发布-潜艇,但无论我尝试什么,它都无法工作。

我所要做的就是创建一个简单的例子。

( 1)我创造了一个主题,说“内容”。

2)比如jvm A中的一个节点创建主题,订阅主题,以及发布主题的发布者。

3)在不同的节点中,比如在不同端口上的jvm B中,我创建了一个订阅服务器。

4)当我从jvm A向主题发送消息时,我希望jvm B上的订阅者也接收到它,因为它订阅了相同的主题。

任何帮助都会受到极大的赞赏,或者是一个简单的示例,即在不同端口上的不同集群系统中,用Java与订阅者和发布者一起使用分布式pub子程序。

下面是app1及其配置文件的代码。

代码语言:javascript
复制
 public class App1{

    public static void main(String[] args) {

    System.setProperty("akka.remote.netty.tcp.port", "2551");
    ActorSystem clusterSystem = ActorSystem.create("ClusterSystem");
    ClusterClientReceptionist clusterClientReceptionist1 = ClusterClientReceptionist.get(clusterSystem);
    ActorRef subcriber1=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber1");
    clusterClientReceptionist1.registerSubscriber("content", subcriber1);
    ActorRef publisher1=clusterSystem.actorOf(Props.create(Publisher.class), "publisher1");
    clusterClientReceptionist1.registerSubscriber("content", publisher1);
    publisher1.tell("testMessage1", ActorRef.noSender());

    }
}

app1.confi

代码语言:javascript
复制
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
  hostname = "127.0.0.1"
  port = 2551
  }
}
cluster {
seed-nodes = [
  "akka.tcp://ClusterSystem@127.0.0.1:2551"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
  akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}

akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
  }
}

app2及其配置文件的代码

代码语言:javascript
复制
public class App
{
   public static Set<ActorPath> initialContacts() {
   return new HashSet<ActorPath>(Arrays.asList(          
   ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")));
}

public static void main( String[] args ) {
    System.setProperty("akka.remote.netty.tcp.port", "2553");
    ActorSystem clusterSystem = ActorSystem.create("ClusterSystem2");
    ClusterClientReceptionist clusterClientReceptionist2 = ClusterClientReceptionist.get(clusterSystem);
    final ActorRef clusterClient = clusterSystem.actorOf(ClusterClient.props(ClusterClientSettings.create(
            clusterSystem).withInitialContacts(initialContacts())), "client"); 
    ActorRef subcriber2=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber2");
    clusterClientReceptionist2.registerSubscriber("content", subcriber2);
    ActorRef publisher2=clusterSystem.actorOf(Props.create(Publisher.class), "publisher2");
    publisher2.tell("testMessage2", ActorRef.noSender());
    clusterClient.tell(new ClusterClient.Send("/user/publisher1", "hello", true), null);

 }
}            

app2.confi

代码语言:javascript
复制
    akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
  hostname = "127.0.0.1"
  port = 2553
  }
}
cluster {
seed-nodes = [
  "akka.tcp://ClusterSystem@127.0.0.1:2553"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
  akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}

akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
  }
}

发布服务器和订阅服务器类对于两个应用程序都是相同的,如下所示。

出版商:

代码语言:javascript
复制
 public class Publisher extends UntypedActor {
 private final ActorRef mediator =
        DistributedPubSub.get(getContext().system()).mediator();

 @Override
 public void onReceive(Object msg) throws Exception {
     if (msg instanceof String) {
         mediator.tell(new DistributedPubSubMediator.Publish("events", msg), getSelf());
    } else {
        unhandled(msg);
    }
 }

}

订户:

代码语言:javascript
复制
public class Subscriber extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

public Subscriber(){

    ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
    mediator.tell(new DistributedPubSubMediator.Subscribe("events", getSelf()), getSelf());

}

public void onReceive(Object msg) throws Throwable {
    if (msg instanceof String) {
        log.info("Got: {}", msg);
    } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
        log.info("subscribing");
    } else {
        unhandled(msg);
    }
}
}

在运行两个应用程序时,我在接收端应用程序中出现了这个错误。死信遇到了

代码语言:javascript
复制
[ClusterSystem-akka.actor.default-dispatcher-21] INFO  akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ClusterSystem/system/receptionist/akka.tcp%3A%2F%2FClusterSystem2%40127.0.0.1%3A2553%2FdeadLetters#188707926] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#1119990682] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

在发送方,应用程序消息成功地显示在日志中。

代码语言:javascript
复制
[ClusterSystem2-akka.actor.default-dispatcher-22] DEBUG akka.cluster.client.ClusterClient - Sending buffered messages to receptionist
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-02-01 23:05:35

以这种方式使用ClusterClient是没有意义的,也与使用分布式发布子没有任何关系,因为您的两个节点都是集群的一部分,您可以直接使用分布式发布子api。

下面是一个简单的main,包括配置,使用您的确切发行者和订阅者角色创建一个两个节点集群,并按预期工作:

代码语言:javascript
复制
public static void main(String[] args) throws Exception {

  final Config config = ConfigFactory.parseString(
    "akka.actor.provider=cluster\n" +
    "akka.remote.netty.tcp.port=2551\n" +
    "akka.cluster.seed-nodes = [ \"akka.tcp://ClusterSystem@127.0.0.1:2551\"]\n");

  ActorSystem node1 = ActorSystem.create("ClusterSystem", config);
  ActorSystem node2 = ActorSystem.create("ClusterSystem",
    ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
      .withFallback(config));

  // wait a bit for the cluster to form
  Thread.sleep(3000);

  ActorRef subscriber = node1.actorOf(
    Props.create(Subscriber.class),
    "subscriber");

  ActorRef publisher = node2.actorOf(
    Props.create(Publisher.class), 
    "publisher");

  // wait a bit for the subscription to be gossiped
  Thread.sleep(3000);

  publisher.tell("testMessage1", ActorRef.noSender());
}

请注意,分布式pub sub不提供任何传递保证,因此如果您在中介器相互联系之前发送消息,消息就会丢失(因此,Thread.sleep语句是ofc语句,您不应该在实际代码中这样做)。

票数 1
EN

Stack Overflow用户

发布于 2021-03-11 03:09:49

我认为问题在于,您的演员系统有不同的名称ClusterSystem和ClusterSystem2。至少我有同样的问题,因为集群中有两个不同的服务,但是我用不同的名称命名每个服务中的系统。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41849495

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档