我有一个akka流管道,它通过BroadcastHub(通过SourceQueueWithComplete )将事件导出到流中。
尽管所有下游用户都插入了一个.buffer() (我希望它能确保集线器和队列的上游缓冲区保持耗尽),但在系统运行了一段时间之后,我仍然观察到了背压。
下面是一个(简化的)片段:
class NotificationHub[Event](
implicit materializer: Materializer,
ecForLogging: ExecutionContext
) {
// a SourceQueue to enque events and a BroadcastHub to allow multiple subscribers
private val (queue, broadCastSource) =
Source.queue[Event](
bufferSize = 64,
// we expect the buffer to never run full and if it does, we want
// to log that asap, so we use OverflowStrategy.backpressure
OverflowStrategy.backpressure
).toMat(BroadcastHub.sink)(Keep.both).run()
// This keeps the BroadCastHub drained while there are no subscribers
// (see https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html ):
broadCastSource.to(Sink.ignore).run()
def notificationSource(p: Event => Boolean): Source[Unit, NotUsed] = {
broadCastSource
.collect { case event if p(event) => () }
// this buffer is intended to keep the upstream buffers of
// queue and hub drained:
.buffer(
// if a downstream consumer ever becomes too slow to consume,
// only the latest two notifications are relevant
size = 2,
// doesn't really matter whether we drop head or tail
// as all elements are the same (), it's just important not
// to backpressure in case of overflow:
OverflowStrategy.dropHead
)
}
def propagateEvent(
event: Event
): Unit = {
queue.offer(event).onComplete {
case Failure(e) =>
// unexpected backpressure occurred!
println(e.getMessage)
e.printStackTrace()
case _ =>
()
}
}
}由于buffer()的医生说,对于DropHead来说,它永远不会产生反压力,所以我本来以为上游的缓冲仍然会耗尽。然而,由于背压,我最终还是没有接到queue.offer()的电话。
我能想到的理由:
.collect中对谓词p的计算会导致大量的负载,从而导致背压。这似乎不太可能,因为这是非常简单的非阻塞操作。我有种感觉我错过了什么吗?我是否需要在.async之前或之后通过buffer()添加一个异步边界,以便将“集线器”与可能在下游某个地方发生的重负载完全解耦?
发布于 2022-07-25 12:47:08
因此,在阅读了更多的akka文档和一些实验之后,我想我找到了解决方案(很抱歉,我问得太早了)。
要将我的代码从可能发生在下游某个地方的任何沉重负载中完全分离出来,我需要确保任何下游代码都不是由与.buffer()相同的参与者执行的(例如,插入.async)。
例如,这段代码最终会导致SourceQueue完全运行,然后向后施加压力:
val hub: NotifactionHub[Int] = // ...
hub.notificationSource(_ => true)
.map { x =>
Thread.sleep(250)
x
}进一步的检查表明,这个.map()将与上游.collect() (和.buffer())在同一个线程(底层参与者)上执行。
如下面所示,插入.async时,.buffer()将删除元素(正如我所希望的那样),而上游SourceQueue仍会耗尽:
val hub: NotifactionHub[Int] = // ...
hub.notificationSource(_ => true)
.async
.map { x =>
Thread.sleep(250)
x
}https://stackoverflow.com/questions/73080970
复制相似问题