我的Akka HTTP应用程序通过服务器发送的事件流一些数据,客户端可以请求的事件比他们所能处理的多得多。代码看起来像这样
complete {
source.filter(predicate.isMatch)
.buffer(1000, OverflowStrategy.dropTail)
.throttle(20, 1 second)
.map { evt => ServerSentEvent(evt) }
}是否有一种方法可以检测到某个阶段的背压,并以某种方式通知客户端,最好是使用相同的接收器(通过发出一种不同的输出),或者如果不可能,只需让Akka框架调用某种类型的回调,通过控制侧通道来处理这个事实?
发布于 2020-04-14 13:00:39
所以,我不太明白你的用例。你是在问.buffer还是.throttle的背压?我的另一部分困惑是,在流已经被压回的情况下,您建议释放一个新的“控制”元素。因此您的控制元素可能会在一段时间内不被接收。另外,如果您每次收到背压时都会发出一个控制元素,那么您很可能会产生大量的控制元素。
构建这一(过于天真)解决方案的一种方法是使用混和。
val simpleSink: Sink[String, Future[Done]] =
Sink.foreach(e => println(s"simple: $e"))
val cycleSource: Source[String, NotUsed] =
Source.cycle(() => List("1", "2", "3", "4").iterator).throttle(5, 1.second)
val conflateFlow: Flow[String, String, NotUsed] =
Flow[String].conflate((a, b) => {
"BACKPRESSURE CONTROL ELEMENT"
})
val backpressureFlow: Flow[String, String, NotUsed] =
Flow[String]
.buffer(10, OverflowStrategy.backpressure) throttle (2, 1.second)
val backpressureTest =
cycleSource.via(conflateFlow).via(backpressureFlow).to(simpleSink).run()要将其转换为更有用的示例,您可以这样做:
.conflate中进行某种类型的调用(然后删除其中一个元素)。但是要小心,不要做任何阻塞的事情。也许只是发送一个信息,这可能会被其他地方复制。不过,我想我必须对用例有更多的了解。看看所有现成的背压感知算子,看看其中一个是否有帮助。
https://stackoverflow.com/questions/61190218
复制相似问题