首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何设计具有扇出能力的反应式流反应器系统

如何设计具有扇出能力的反应式流反应器系统
EN

Stack Overflow用户
提问于 2016-07-18 22:34:20
回答 1查看 329关注 0票数 1

我正在尝试实现一个具有背压功能的基于角色的系统。作为要求,主进程接收JSON格式的流数据。但是,每个JSON事件都有几个字段,例如{ip:'123.43.12.1',country:'US',...等。JSON的结构是预先知道的。

现在,我必须以某种方式将JSON结构展平为(键、值)对。例如,可以将上述数据展平为( ip,freq),(country,freq),其中freq是ip(例如'123.43.12.1')在数据流中出现的次数。

一种非常自然的方式是将每个(键、值)对转发给相应的子/远程参与者进行进一步评估。例如,('123.43.12.1',1)发送到IP-Actor;('US',1)发送到Country-Actor,依此类推。

我想确保整个系统都有背压。在这种情况下,情况更为复杂,因为只有在IP-Actor和Country-Actor都已完成处理展平对('123.43.12.1',1),('US',1)的情况下,事件{ip:'123.43.12.1',country:'US'}才被视为已处理。每个参与者可能具有不同的处理速度(例如,IP-Actor比Country- actor快得多)。在这种情况下,我希望接收到流的主进程将等待/阻塞,直到有请求信号(当两个参与者完成处理其邮箱中的现有数据时发生)。否则,一些参与者可能会在邮箱(Country-Actor - slow )中充满消息,但消息仍会进入,因为其他参与者邮箱是空的(IP-Actor - faster )。

有没有人能建议一下reactive-stream规范是否提供了这样的功能。如果没有,有没有办法以最有效的方式实现这些功能。

谢谢。

EN

回答 1

Stack Overflow用户

发布于 2016-07-20 19:40:16

您所描述的Actor之间的同步类型正是您希望在Actor Model中避免的类型。任何“等待/阻塞”都是反应式编程的对立面。我建议使用单流Flow进行更新。

您首先需要处理json数据:

代码语言:javascript
复制
import akka.stream.scaladsl._

//your original source of json strings
val jsonSrc : Source[String, NotUsed] = ???

case class JsonObject(ip : String, country : String)

//use your favorite json parser
def jsonParser(jsonStr : String) : JsonObject = ???

val parserFlow = Flow[String] map jsonParser

接下来,定义计数器逻辑并使用Flow.scan生成递增值的计数器:

代码语言:javascript
复制
type IPCounter = Map[String,Int]
val emptyIPCounter = Map.empty[String,Int] withDefaultValue 0

type CountryCounter = Map[String, Int]
val emptyCountryCounter = Map.empty[String,Int] withDefaultValue 0

type Counters = Tuple2[IPCounter, CountryCounter]
val emptyCounters = (emptyIPCounter, emptyCountryCounter)

def updateCounters(counters : Counters, jsonObj : JsonObject) : Counters = {
  (counters._1.updated(jsonObj.ip, counters._1(jsonObj.ip) + 1),
   counters._2.updated(jsonObj.country, counters._2(jsonObj.country) + 1))
}

val counterFlow = Flow[JsonObject].scan(emptyCounters)(updateCounters)

最后,将所有内容组合在一起:

代码语言:javascript
复制
val counterSource : Source[Counters, NotUsed] = jsonSrc via parserFlow via counterFlow

结果就是您所要求的:一个反压流,当所有计数器都已更新时,它只向前传递计数器值。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38439411

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档