我有一个Actor,它被设计为与akka-io acking一起工作,这样当它向上游(向网络)发送消息时,它将等待Ack。此参与者是后端异步应用程序的接口。
我希望有一个包装层,它允许我将这个Actor转换成一个akka流Flow[Incoming, Outgoing, ???],这样它就可以与预期有这样一个签名的新库集成起来。
(来自上游的消息很少,所以我们不太在意向后施压,但拥有它并不是一件坏事。)
sealed trait Incoming //... with implementations
sealed trait Outgoing //... with implementations
object Ack
// `upstream` is an akka-io connection actor that will send Ack
// when it writes an Outgoing message to the socket
class SimpleActor(upstream: Actor) extends Actor {
def receive = {
case in: Incoming if sender() == upstream =>
// does some work in response to upstream
case other =>
// does some work in response to downstream
// including sending messages to upstream and
// `becoming` a stashing state waiting for Ack
// to `unbecome`, then sending Ack downstream
// (which will respect the backpressure).
}
}我从akka用户邮件列表中获得了良好的权威,即akka流中没有将参与者与流集成在一起的代码,为了将Actor插入到流中并保持基于Ack的背压,必须实现PushPullStage。
看来我们需要两个PushPullStage.一个用于upstream => SimpleActor,另一个用于SimpleActor => upstream。
我的问题是:
PushPullStage更简单的方法吗?发布于 2015-05-31 10:49:52
我认为阿克卡流的理念是提供低水平的砖块,并在其之上建造更高级的工具。如果您查看我们最近发布的开放源码库https://github.com/MfgLabs/akka-stream-extensions,您会发现我们确实做到了这一点。我们提供了一些有用的结构,以便更容易地管理速率限制器、状态处理器、延迟和生成器等。对于演员集成,我认为应该有可能创建某种类型的帮手,以便更容易地将演员与akka流试图传播背压的角色集成起来。阿克卡流还很年轻,生态系统还在不断发展;)
发布于 2015-05-27 14:00:08
是的,你可以把演员和流结合起来。
为此目的,有一些特殊的行为者:演员发行者和演员订阅者。
都在这里:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html
当然,你必须用这样的方式来写演员,这样才能在流的背压下工作。但你不需要推拉台。
https://stackoverflow.com/questions/30479777
复制相似问题