首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >连接到自定义Akka.NET传输的故障

连接到自定义Akka.NET传输的故障
EN

Stack Overflow用户
提问于 2022-09-27 16:00:48
回答 1查看 23关注 0票数 0

我正在尝试实现一个自定义的Akka.NET Transport。目前,我有以下几点:

客户端程序(基于https://getakka.net/articles/remoting/messaging.html):

代码语言:javascript
复制
using Akka.Actor;
using Akka.Configuration;
using AkkaTest.Messages;

namespace AkkaTest;

static class Program
{
    static void Main()
    {
        var config = ConfigurationFactory.ParseString(@"
akka {  
    actor {
        provider = remote
    }
    remote {
        enabled-transports = [""akka.remote.myproto""]
        myproto {
            transport-class = ""AkkaTest.Messages.MyTransport, AkkaTest.Messages""
            applied-adapters = []
            transport-protocol = myproto
            port = 0 # bound to a dynamic port assigned by the OS
            hostname = localhost
        }
    }
}");

        var system = ActorSystem.Create("MyActorSystem", config);
        var myActor = system.ActorOf<SendActor>();
        myActor.Tell(new StartRemoting());
        while (true)
        {
            // null task
        }
    }
}

// Initiates remote contact 
public class SendActor : ReceiveActor
{
    public SendActor()
    {
        Receive<StartRemoting>(s => {
            Context.ActorSelection("akka.myproto://RemoteSys@localhost:8081/user/Echo")
            .Tell(new Msg("hi!"));
        });
        Receive<Msg>(msg => {
            Console.WriteLine("Received {0} from {1}", msg.Content, Sender);
        });
    }
}

服务器:

代码语言:javascript
复制
using Akka.Actor;
using Akka.Configuration;
using AkkaTest.Messages;

namespace AkkaTest.Server;

static class Program
{
    static void Main()
    {
        var config = ConfigurationFactory.ParseString(@"
akka {  
    actor {
        provider = remote
    }
    remote {
        enabled-transports = [""akka.remote.myproto""]
        myproto {
            transport-class = ""AkkaTest.Messages.MyTransport, AkkaTest.Messages""
            applied-adapters = []
            transport-protocol = myproto
            port = 8081 #bound to a specific port
            hostname = localhost
        }
    }
}");

        var system = ActorSystem.Create("RemoteSys", config);
        system.ActorOf<EchoActor>("Echo");
        while (true)
        {
            // null task
        }
    }
}

// Runs in a separate process from SendActor
public class EchoActor : ReceiveActor
{
    public EchoActor()
    {
        Receive<Msg>(msg => {
            // echo message back to sender
            Sender.Tell(msg);
        });
    }
}

消息:

代码语言:javascript
复制
namespace AkkaTest.Messages;

// Written into a shared library
public class Msg
{
    public Msg(string content)
    {
        this.Content = content;
    }

    public string Content { get; set; }
}

public class StartRemoting { }

删除关联事件侦听器:

代码语言:javascript
复制
namespace AkkaTest.Messages
{
    using Akka.Remote.Transport;

    public class MyAel : IAssociationEventListener
    {
        public void Notify(IAssociationEvent ev)
        {
            throw new NotImplementedException();
        }
    }
}

最后,被撞坏的交通工具:

代码语言:javascript
复制
namespace AkkaTest.Messages
{
    using Akka.Actor;
    using Akka.Configuration;
    using Akka.Remote.Transport;
    using System;
    using System.Threading.Tasks;

    public class MyTransport : Transport
    {
        public MyTransport(ActorSystem system, Config config)
        {
            this.System = system;
            this.Config = config;
        }

        public override Task<AssociationHandle> Associate(Address remoteAddress)
        {
            throw new NotImplementedException();
        }

        public override bool IsResponsibleFor(Address remote)
        {
            return remote.Protocol.ToUpper() == this.Config.GetString("transport-protocol").ToUpper();
        }

        public override Task<(Address, TaskCompletionSource<IAssociationEventListener>)> Listen()
        {
            var tcs = new TaskCompletionSource<IAssociationEventListener>();
            tcs.TrySetResult(new MyAel());
            var protocol = this.Config.GetString("transport-protocol");
            var host = this.Config.GetString("hostname");
            var port = this.Config.GetInt("port");
            if (port == 0)
            {
                port = 62978;
            }
            return Task.FromResult((
                new Address(
                    protocol,
                    this.System.Name,
                    host,
                    port),
                tcs));
        }

        public override Task<bool> Shutdown()
        {
            throw new NotImplementedException();
        }
    }
}

以前,我尝试了一个非常类似的客户机/服务器设置与TCP,而不是我的自定义传输。这是服务器输出:

代码语言:javascript
复制
[INFO][9/27/2022 2:29:15 PM][Thread 0001][remoting (akka://RemoteSys)] Starting remoting
[INFO][9/27/2022 2:29:15 PM][Thread 0001][remoting (akka://RemoteSys)] Remoting started; listening on addresses : [akka.tcp://RemoteSys@localhost:8081]
[INFO][9/27/2022 2:29:15 PM][Thread 0001][remoting (akka://RemoteSys)] Remoting now listens on addresses: [akka.tcp://RemoteSys@localhost:8081]

客户:

代码语言:javascript
复制
[INFO][9/27/2022 2:29:29 PM][Thread 0001][remoting (akka://MyActorSystem)] Starting remoting
[INFO][9/27/2022 2:29:30 PM][Thread 0001][remoting (akka://MyActorSystem)] Remoting started; listening on addresses : [akka.tcp://MyActorSystem@localhost:62978]
[INFO][9/27/2022 2:29:30 PM][Thread 0001][remoting (akka://MyActorSystem)] Remoting now listens on addresses: [akka.tcp://MyActorSystem@localhost:62978]
Received hi! from [akka.tcp://RemoteSys@localhost:8081/user/Echo#2122916400]

所以这就是我期望的行为。当我使用自定义传输启动服务器项目时,将得到以下控制台输出:

代码语言:javascript
复制
[INFO][9/27/2022 3:49:50 PM][Thread 0001][remoting (akka://RemoteSys)] Starting remoting
[INFO][9/27/2022 3:49:50 PM][Thread 0001][remoting (akka://RemoteSys)] Remoting started; listening on addresses : [akka.myproto://RemoteSys@localhost:8081]
[INFO][9/27/2022 3:49:50 PM][Thread 0001][remoting (akka://RemoteSys)] Remoting now listens on addresses: [akka.myproto://RemoteSys@localhost:8081]

这正是我所期望的。与我在TCP中尝试相同设置时得到的输出非常相似。但是,当我尝试启动客户机时,会出现以下错误:

代码语言:javascript
复制
[INFO][9/27/2022 3:50:48 PM][Thread 0001][remoting (akka://MyActorSystem)] Starting remoting
[INFO][9/27/2022 3:50:48 PM][Thread 0001][remoting (akka://MyActorSystem)] Remoting started; listening on addresses : [akka.myproto://MyActorSystem@localhost:62978]
[INFO][9/27/2022 3:50:48 PM][Thread 0001][remoting (akka://MyActorSystem)] Remoting now listens on addresses: [akka.myproto://MyActorSystem@localhost:62978]
[ERROR][9/27/2022 3:50:49 PM][Thread 0009][akka://MyActorSystem/user/$a] No transport is loaded for protocol: [akka.myproto], available protocols: [akka.]
Cause: Akka.Remote.RemoteTransportException: No transport is loaded for protocol: [akka.myproto], available protocols: [akka.]
   at Akka.Remote.Remoting.LocalAddressForRemote(IDictionary`2 transportMapping, Address remote)
   at Akka.Remote.RemoteActorRefProvider.RootGuardianAt(Address address)
   at Akka.Actor.ActorRefFactoryShared.ActorSelection(String path, ActorSystem system, IActorRef lookupRoot)
   at AkkaTest.SendActor.<>c.<.ctor>b__0_0(StartRemoting s) in ...\AkkaTest\AkkaTest\Program.cs:line 44
   at lambda_method7(Closure , Object , Action`1 , Action`1 )
   at Akka.Actor.ReceiveActor.OnReceive(Object message)
   at Akka.Actor.UntypedActor.Receive(Object message)
   at Akka.Actor.ActorBase.AroundReceive(Receive receive, Object message)
   at Akka.Actor.ActorCell.ReceiveMessage(Object message)
   at Akka.Actor.ActorCell.Invoke(Envelope envelope)

无论服务器是否已启动,我都会遇到同样的问题,因此几乎没有试图建立关联。我怀疑问题可能是我在这一点上有太多的问题(我特别想知道这个问题是否与tcs.TrySetResult(new MyAel());行有关),但我没有太多要填写的内容;文档有点稀疏。据我所知,没有任何NotImplementedExceptions被击中,也没有任何先前的断点。在这个阶段我错过了什么?

EN

回答 1

Stack Overflow用户

发布于 2022-09-27 16:28:11

找到(当前)问题。检查DotnettyTransport的元数据时,我注意到它覆盖了虚拟SchemeIdentifier。我在我的计算机中设置了SchemeIdentifier

代码语言:javascript
复制
public MyTransport(ActorSystem system, Config config)
{
    this.System = system;
    this.Config = config;
    this.SchemeIdentifier = this.Config.GetString("transport-protocol");
}

在我这样做之后,我的IsResponsibleFor实现被调用了。这导致了我的下一个问题,传入的remote.Protocol属性没有像我预期的那样返回myproto,而是返回akka.myproto。容易修复的:

代码语言:javascript
复制
public override bool IsResponsibleFor(Address remote)
{
    return remote.Protocol.ToUpper() == $"AKKA.{this.SchemeIdentifier.ToUpper()}";
}

等会儿再整理一下。既然这样做了,我的Associate覆盖就会被调用,当然还会抛出它的NotImplementedException

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73870530

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档