首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Akka-stream的有状态操作?

使用Akka-stream的有状态操作?
EN

Stack Overflow用户
提问于 2019-09-06 11:21:45
回答 1查看 75关注 0票数 0

我正在尝试弄清楚如何使用akka-stream执行以下有状态操作:

假设我通过流发出一组元素e,其中包含任意一组元素a。

我想根据接收到的元素e的数量表示的元素a的总量,对向下游传递的元素e的数量进行速率限制。

例如4

传入流

--> e1(a1e1)

--> e2(a1e2,a2e2)

--> e3(a1e3)

--> e4(a1e4,a2e4)

--> e5(a1e5,a2e5)

将会发出

group1 e1,e2,e3

group2 e4,e5

最终,这应该像在groupWithin中一样计时。如果有一段时间过去了,那就尽情释放吧。

听起来statefulmapContact可能是值得关注的东西,但我不确定。

如果任何akka-stream专家能在这里提供帮助,那就太棒了。

EN

回答 1

Stack Overflow用户

发布于 2019-09-12 05:09:45

从描述中,我假设您想要控制向下游生产元素的速度。此外,处理每个元素的成本也不同。

控制流速度的开箱即用选项很少。

  1. ,也许你想使用throttle。它控制流的吞吐量。

节流-将吞吐量限制为每个时间单位的特定元素数量,或每个时间单位的特定总成本,其中必须提供一个函数来计算每个元素的单独成本。导入java.time.LocalDateTime

len => E((1 to len).map(_ => 1) .throttle(5,1.second,_.as.size) .runForeach(e => { println(s"${LocalDateTime.now()} -> $e") }) f.onComplete(_ => { mat.shutdown() sys.terminate() }) }

  1. 另一种选择是使用分组流程的组合,例如groupedWeightedWithin(使用元素进行批处理,直到批处理成本或经过的时间)或batchWeighted(如果下游较慢,则进行批处理/聚合)和简单的throttle

groupedWeightedWithin -将此流分成在时间窗口内接收的元素组,或受元素的权重限制,无论最先发生什么。

import java.time.LocalDateTime import import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.duration._ import scala.util.Random import object GroupedWithingExample extends { implicit val sys: ActorSystem = ActorSystem() implicit val mat: ActorMaterializer = ActorMaterializer() case class E(as: SeqInt) val f= Source(1 to 20) .map(_ => Random.nextInt(5)).map(len => E((1 to len).map(_ => 1) .groupedWeightedWithin(7,1.second)(_.as.length) .throttle(1,1.second) .runForeach(e => { println(s"${LocalDateTime.now()} -> $e") }) f.onComplete(_ => { mat.shutdown() sys.terminate() }) }

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57815296

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档