首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Akka连接中止

Akka连接中止
EN

Stack Overflow用户
提问于 2013-10-19 18:27:33
回答 1查看 1.6K关注 0票数 1

我对Akka有奇怪的行为,问题是连接重置,这是由于TcpMessage.abort()从处理程序或Terminating处理程序中显式调用造成的。对等方没有接收到Tcp.ConnectionClosed事件。示例:

处理程序onReceive

代码语言:javascript
复制
@Override
public void onReceive(Object msg) throws Exception {
    if (msg instanceof Tcp.ConnectionClosed) {
        log.info("Server ConnectionClosed: {}", msg);
        getContext().stop(getSelf());
    } else if (msg instanceof Tcp.Received){
        log.info("Aborting connection");
        getSender().tell(TcpMessage.abort(), getSelf());
    }
}

测试代码

代码语言:javascript
复制
new JavaTestKit(system) {{
     getSystem().actorOf(Props.create(Server.class, getRef()), "Server");
     Tcp.Bound bound = expectMsgClass(Tcp.Bound.class);

     ActorRef client = getSystem().actorOf(
     Props.create(Client.class, bound.localAddress(), getRef()), "Client");
     watch(client);
     expectMsgClass(Tcp.Connected.class);
     client.tell(new Message.Write(), getRef());

     expectTerminated(client);
}};

执行后记录

代码语言:javascript
复制
[INFO] [10/19/2013 22:13:22.730] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Client Connected
[INFO] [10/19/2013 22:13:22.736] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Sending Write to Handler
[INFO] [10/19/2013 22:13:22.767] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Aborting connection
[INFO] [10/19/2013 22:13:22.774] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Aborted

java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg: Terminated Actor[akka://actorSystem/user/Client#423174850]

如果我在处理程序中将TcpMessage.abort()更改为TcpMessage.close()

代码语言:javascript
复制
[INFO] [10/19/2013 22:17:06.243] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Client Connected
[INFO] [10/19/2013 22:17:06.249] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Sending Write to Handler
[INFO] [10/19/2013 22:17:06.278] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$a] Aborting connection
[INFO] [10/19/2013 22:17:06.288] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Client] Client ConnectionClosed: PeerClosed
[INFO] [10/19/2013 22:17:06.288] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Closed

服务器处理程序如何向远程客户端发送信号--错误关闭了什么连接?如果我将中止从客户端发送到服务器,一切都可以正常工作。

编辑:

好的,找到了问题的根源,只有在尝试中止之前发送写命令时,中止命令才能正常工作。在客户端和服务器上。现在我需要找出原因。

EDIT2:

似乎中止消息被卡住了,如果我创建了新的客户端,那么就可以到达第一个客户端,并向第二个客户端发送相同的写命令,执行客户端预期接收到的中止。

代码语言:javascript
复制
new JavaTestKit(system) {{
    getSystem().actorOf(Props.create(Server.class, getRef()), "Server");
    Tcp.Bound bound = expectMsgClass(Tcp.Bound.class);

    ActorRef client = getSystem().actorOf(
            Props.create(Client.class, bound.localAddress(), getRef()), "Client");
    watch(client);
    expectMsgClass(Tcp.Connected.class);
    client.tell(new Message.Write(), getRef());
    expectNoMsg();

    ActorRef client2 = getSystem().actorOf(
           Props.create(Client.class, bound.localAddress(), getRef()), "Client2");
    watch(client2);
    expectMsgClass(Tcp.Connected.class);

    expectTerminated(client);

    client2.tell(new Message.Write(), getRef());

    expectTerminated(client2);
    expectNoMsg();
}};

和日志:

代码语言:javascript
复制
[INFO] [10/20/2013 00:48:05.923] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Client Connected
[INFO] [10/20/2013 00:48:05.929] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Sending Write to Handler
[INFO] [10/20/2013 00:48:05.959] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$a] Aborting connection, connection actor: [Actor[akka://actorSystem/system/IO-TCP/selectors/$a/2#1767183113]]
[INFO] [10/20/2013 00:48:05.966] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Aborted
[INFO] [10/20/2013 00:48:08.930] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Client2] Client Connected
[INFO] [10/20/2013 00:48:08.934] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Client ConnectionClosed: ErrorClosed(An existing connection was forcibly closed by the remote host)
[INFO] [10/20/2013 00:48:08.936] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client2] Sending Write to Handler
[INFO] [10/20/2013 00:48:08.937] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$b] Aborting connection, connection actor: [Actor[akka://actorSystem/system/IO-TCP/selectors/$a/4#-598138943]]
[INFO] [10/20/2013 00:48:08.938] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$b] Server ConnectionClosed: Aborted
[INFO] [10/20/2013 00:48:08.938] [actorSystem-akka.actor.default-dispatcher-7] [akka://actorSystem/user/Client2] Client ConnectionClosed: ErrorClosed(An existing connection was forcibly closed by the remote host)

EDIT3:

为了让它更清楚一点,问题是:我如何能够告诉我的远程参与者处理程序什么对等程序崩溃了,并ErrorClosed连接(它在远程端执行)。最后一条ErrorClosed消息总是被卡住,它没有消失,它只是位于选择器的某个位置,只在我执行另一个网络操作后才被推送到对等点(写入或连接另一个参与者到服务器)。只有在中止之前执行Write命令时,才会出现此行为。这种行为与操作系统无关;我在Windows和Linux机器上尝试了同样的结果。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2013-10-23 00:14:28

我没有为这个问题找到可靠的解决方案,但我做了很少的事情来最大限度地减少总的影响。

  1. 不要显式使用RST (这包括中止命令)。始终与鳍紧密相连(关闭)。即使在出错的情况下。
  2. 如果操作人员因错误而死亡,则信号连接关闭,因此可以正确关闭连接。
  3. 使用异步req数据传输机制来传输大量数据(ZMQ)。

心跳演员也可以实现,但似乎不是很好的主意,特别是Akka IO选择器在TCP套接字上自己运行心跳?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/19469515

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档