我们将Akka.net框架用于能源领域的高伸缩性应用程序。
我们将Akka.net用于各种任务,主要采用以下形式:
var system=ActorSystem.Create("actorSystem");
var props=Props.Create<UpdateActor>();
.WithRouter(new SmallesMailboxPool(100));
var actorRef=system.ActorOf(props,"UpdateActor");
foreach(var timerow in timeRowList)
actorRef.Tell(timerow)不幸的是,在很多情况下,Akka.net框架的伸缩性非常差。CPU负载仅为12%。显然,只使用了一个线程或几个线程。
如何将Akka.Net配置为使用多个线程来处理参与者?
发布于 2018-11-23 16:48:35
这是一个有根据的猜测,但如果您使用的是SmallestMailboxPool,请记住,它在非阻塞I/O方面非常糟糕,在存储方面也非常糟糕。
第一件事通常是检查是否没有阻塞操作(如同步I/O,调用AsyncMethod().Result或Thread.Sleep),这将阻塞当前线程,有效地防止它被其他参与者使用。
另一个问题是非常特定于最小的邮箱路由器,它与隐藏和持久参与者相关。
储藏
存储是处理多步骤操作的一种流行方法。这种模式可以表示为下面的模式。
public class MyActor : ActorBase, IWithUnboundedStash
{
public IStash Stash { get; set; }
public Receive Active(State workUnit) => message =>
{
switch(message)
{
case DoWork:
// stash all messages not related to current work
Stash.Stash(message);
return true;
case WorkDone done:
// when current unit of work is done, unstash pending messages
Stash.UnstashAll();
Become(Idle);
return true;
}
};
public bool Idle(object message)
{
switch(message)
{
case DoWork work:
StartWork(work.State);
Become(Active(work.State)); //continue work in new behavior
return true;
default:
return false;
}
}
public bool Receive(object message) => Idle(message);
}这种情况很常见,例如persistent actors在其恢复过程中使用它。问题是,它正在清理邮箱,这会让SmallestMailbox路由器误以为该参与者的邮箱是空的,而实际上它只是在隐藏所有传入的消息。
这也是为什么不应该使用 SmallestMailbox routers!Tbh。我想不出任何场景,在任何类型的路由器后面放置持久的参与者都是一个有效的选择。
发布于 2019-11-05 18:26:35
我认为你需要做的是创建一个角色协调类,然后在里面创建一个角色列表/字典。然后(据我所知),在你告诉协调器新的更新之后,它们应该并行工作。
public class UpdateCoordinator : ReceiveActor
{
//all your update referenced
private readonly Dictionary<int, IActorRef> _updates;
public UpdateCoordinator()
{
_updates = new Dictionary<int, IActorRef>();
//create the update reference
Receive<UpdateMessage>(updateMessage =>
{
//add to the list of actors
CreateUpdateReferenceIfNotExists(updateMessage.Identifier);
IActorRef childUpdateRef = _updates[updateMessage.Identifier];
//start your update actor
childUpdateRef.Tell(updateMessage);
});
}
private void CreateUpdateReferenceIfNotExists(int identifier)
{
if (!_updates.ContainsKey(identifier))
{
IActorRef newChildUpdateRef = Context.ActorOf(Props.Create(()=> new UpdateActor(identifier)), $"Update_{identifier}");
_updates.Add(identifier, newChildUpdateRef);
}
}
}https://stackoverflow.com/questions/53416051
复制相似问题