我正在尝试弄清楚如何使用akka-stream执行以下有状态操作:
假设我通过流发出一组元素e,其中包含任意一组元素a。
我想根据接收到的元素e的数量表示的元素a的总量,对向下游传递的元素e的数量进行速率限制。
例如4
传入流
--> e1(a1e1)
--> e2(a1e2,a2e2)
--> e3(a1e3)
--> e4(a1e4,a2e4)
--> e5(a1e5,a2e5)
将会发出
group1 e1,e2,e3
group2 e4,e5
最终,这应该像在groupWithin中一样计时。如果有一段时间过去了,那就尽情释放吧。
听起来statefulmapContact可能是值得关注的东西,但我不确定。
如果任何akka-stream专家能在这里提供帮助,那就太棒了。
发布于 2019-09-12 05:09:45
从描述中,我假设您想要控制向下游生产元素的速度。此外,处理每个元素的成本也不同。
控制流速度的开箱即用选项很少。
,
throttle。它控制流的吞吐量。节流-将吞吐量限制为每个时间单位的特定元素数量,或每个时间单位的特定总成本,其中必须提供一个函数来计算每个元素的单独成本。导入java.time.LocalDateTime
len => E((1 to len).map(_ => 1) .throttle(5,1.second,_.as.size) .runForeach(e => { println(s"${LocalDateTime.now()} -> $e") }) f.onComplete(_ => { mat.shutdown() sys.terminate() }) }
groupedWeightedWithin(使用元素进行批处理,直到批处理成本或经过的时间)或batchWeighted(如果下游较慢,则进行批处理/聚合)和简单的throttle。groupedWeightedWithin -将此流分成在时间窗口内接收的元素组,或受元素的权重限制,无论最先发生什么。
import java.time.LocalDateTime import import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.duration._ import scala.util.Random import object GroupedWithingExample extends { implicit val sys: ActorSystem = ActorSystem() implicit val mat: ActorMaterializer = ActorMaterializer() case class E(as: SeqInt) val f= Source(1 to 20) .map(_ => Random.nextInt(5)).map(len => E((1 to len).map(_ => 1) .groupedWeightedWithin(7,1.second)(_.as.length) .throttle(1,1.second) .runForeach(e => { println(s"${LocalDateTime.now()} -> $e") }) f.onComplete(_ => { mat.shutdown() sys.terminate() }) }
https://stackoverflow.com/questions/57815296
复制相似问题