首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何对来自无限流的传入事件进行分组?

如何对来自无限流的传入事件进行分组?
EN

Stack Overflow用户
提问于 2015-11-21 07:30:58
回答 3查看 1.8K关注 0票数 5

我有无限的事件流:

代码语言:javascript
复制
(timestamp, session_uid, traffic)

代码语言:javascript
复制
...
(1448089943, session-1, 10)
(1448089944, session-1, 20)
(1448089945, session-2, 50)
(1448089946, session-1, 30)
(1448089947, session-2, 10)
(1448089948, session-3, 10)
...

这些事件我想按session_uid分组,并计算每个会话的流量之和。

我编写了一个akka-streams流,它可以很好地处理有限的流,使用groupBy (我的代码基于食谱中的示例)。但是对于无限流,它将无法工作,因为groupBy函数应该处理所有传入流,然后才准备返回结果。

我认为我应该实现带超时的分组,也就是说,如果在最后5分钟内没有接收到指定stream_uid的事件,我应该返回这个session_uid的分组事件。但是,如何使用akka-streams来实现它呢?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2015-11-21 20:02:20

我想出了一个有点吝啬的解决方案,但我认为它可以完成任务。

其基本思想是使用源的keepAlive方法作为触发完成的定时器。

但要做到这一点,我们首先必须对数据进行一些抽象。因此,计时器需要从原始源发送触发器或另一个元组值:

代码语言:javascript
复制
sealed trait Data

object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data

然后将我们的元组源转换为值的来源。我们仍将使用groupBy进行类似于有限流情况的分组:

代码语言:javascript
复制
val originalSource : Source[(Long, String, Int), Unit] = ???

type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid

val groupedDataSource : Source[IDGroup, Unit] = 
  originalSource.map(t => Value(t._1, t._2, t._3))
                .groupBy(_.session_uid)

棘手的部分是处理只是元组的分组:(String, Source[Value,Unit])。如果时间已经过去,我们需要计时器通知我们,所以我们需要另一个抽象来知道我们是否还在计算,或者由于超时而完成了计算:

代码语言:javascript
复制
sealed trait Sum {
  val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum

val zeroSum : Sum = StillComputing(0)

现在我们可以把每一组的源头都抽干了。如果值源在keepAlive之后没有生成某些内容,那么TimerTrigger将发送一个timeOut。然后,来自Data的keepAlive模式与原始源的TimerTrigger或新值相匹配:

代码语言:javascript
复制
val evaluateSum : ((Sum , Data)) => Sum = {
  case (runningSum, data) => { 
    data match {
      case TimerTrigger => ComputedSum(runningSum.sum)
      case v : Value    => StillComputing(runningSum.sum + v.traffic)
    }
  }
}//end val evaluateSum

type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid

def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult = 
  idGroup._1 -> idGroup._2.keepAlive(timeOut, () => TimerTrigger)
                          .scan(zeroSum)(evaluateSum)
                          .collect {case c : ComputedSum => c.sum}
                          .runWith(Sink.head)

集合应用于只匹配已完成和的部分函数,因此只有在计时器触发后才能到达Sink。

然后,我们将这个处理程序应用于每一个出现的分组:

代码语言:javascript
复制
val timeOut = FiniteDuration(5, MINUTES)

val sumSource : Source[SumResult, Unit] = 
  groupedDataSource map handleGroup(timeOut)

我们现在有一个(String,Future[Int])的来源,它是session_uid和该id的流量之和的未来。

就像我说的,复杂但符合要求。另外,我不完全确定如果已经分组并超时的uid会发生什么,但是随后会出现一个具有相同uid的新值。

票数 3
EN

Stack Overflow用户

发布于 2015-12-07 17:42:37

这似乎是Source.groupedWithin的用例

代码语言:javascript
复制
def groupedWithin(n: Int, d: FiniteDuration): Source[List[Out], Mat]

“将该流分成一组在时间窗口内接收的元素,或受给定数量的元素的限制,无论首先发生什么。”

这是指向文档的链接

票数 2
EN

Stack Overflow用户

发布于 2015-11-21 11:03:18

也许您可以简单地通过参与者来实现它。

代码语言:javascript
复制
case class SessionCount(name: String)

class Hello private() extends Actor {
  var sessionMap = Map[String, Int]()

  override def receive: Receive = {
    case (_, session: String, _) =>
      sessionMap = sessionMap + (session -> (sessionMap.getOrElse(session, 0) + 1))

    case SessionCount(name: String) => sender() ! sessionMap.get(name).getOrElse(0)
  }
}


object Hello {
  private val actor = ActorSystem.apply().actorOf(Props(new Hello))
  private implicit val timeOver = Timeout(10, TimeUnit.SECONDS)
  type Value = (String, String, String)

  def add(value: Value) = actor ! value

  def count(name:String) = (actor ? SessionCount(name )).mapTo[Int]
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33840868

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档