我正在尝试实现一个具有背压功能的基于角色的系统。作为要求,主进程接收JSON格式的流数据。但是,每个JSON事件都有几个字段,例如{ip:'123.43.12.1',country:'US',...等。JSON的结构是预先知道的。
现在,我必须以某种方式将JSON结构展平为(键、值)对。例如,可以将上述数据展平为( ip,freq),(country,freq),其中freq是ip(例如'123.43.12.1')在数据流中出现的次数。
一种非常自然的方式是将每个(键、值)对转发给相应的子/远程参与者进行进一步评估。例如,('123.43.12.1',1)发送到IP-Actor;('US',1)发送到Country-Actor,依此类推。
我想确保整个系统都有背压。在这种情况下,情况更为复杂,因为只有在IP-Actor和Country-Actor都已完成处理展平对('123.43.12.1',1),('US',1)的情况下,事件{ip:'123.43.12.1',country:'US'}才被视为已处理。每个参与者可能具有不同的处理速度(例如,IP-Actor比Country- actor快得多)。在这种情况下,我希望接收到流的主进程将等待/阻塞,直到有请求信号(当两个参与者完成处理其邮箱中的现有数据时发生)。否则,一些参与者可能会在邮箱(Country-Actor - slow )中充满消息,但消息仍会进入,因为其他参与者邮箱是空的(IP-Actor - faster )。
有没有人能建议一下reactive-stream规范是否提供了这样的功能。如果没有,有没有办法以最有效的方式实现这些功能。
谢谢。
发布于 2016-07-20 19:40:16
您所描述的Actor之间的同步类型正是您希望在Actor Model中避免的类型。任何“等待/阻塞”都是反应式编程的对立面。我建议使用单流Flow进行更新。
您首先需要处理json数据:
import akka.stream.scaladsl._
//your original source of json strings
val jsonSrc : Source[String, NotUsed] = ???
case class JsonObject(ip : String, country : String)
//use your favorite json parser
def jsonParser(jsonStr : String) : JsonObject = ???
val parserFlow = Flow[String] map jsonParser接下来,定义计数器逻辑并使用Flow.scan生成递增值的计数器:
type IPCounter = Map[String,Int]
val emptyIPCounter = Map.empty[String,Int] withDefaultValue 0
type CountryCounter = Map[String, Int]
val emptyCountryCounter = Map.empty[String,Int] withDefaultValue 0
type Counters = Tuple2[IPCounter, CountryCounter]
val emptyCounters = (emptyIPCounter, emptyCountryCounter)
def updateCounters(counters : Counters, jsonObj : JsonObject) : Counters = {
(counters._1.updated(jsonObj.ip, counters._1(jsonObj.ip) + 1),
counters._2.updated(jsonObj.country, counters._2(jsonObj.country) + 1))
}
val counterFlow = Flow[JsonObject].scan(emptyCounters)(updateCounters)最后,将所有内容组合在一起:
val counterSource : Source[Counters, NotUsed] = jsonSrc via parserFlow via counterFlow结果就是您所要求的:一个反压流,当所有计数器都已更新时,它只向前传递计数器值。
https://stackoverflow.com/questions/38439411
复制相似问题