首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >尽管插入了缓冲器,akka-流管道背压

尽管插入了缓冲器,akka-流管道背压
EN

Stack Overflow用户
提问于 2022-07-22 13:15:22
回答 1查看 148关注 0票数 0

我有一个akka流管道,它通过BroadcastHub(通过SourceQueueWithComplete )将事件导出到流中。

尽管所有下游用户都插入了一个.buffer() (我希望它能确保集线器和队列的上游缓冲区保持耗尽),但在系统运行了一段时间之后,我仍然观察到了背压。

下面是一个(简化的)片段:

代码语言:javascript
复制
class NotificationHub[Event](
  implicit materializer: Materializer,
  ecForLogging: ExecutionContext
) {

  // a SourceQueue to enque events and a BroadcastHub to allow multiple subscribers
  private val (queue, broadCastSource) =
    Source.queue[Event](
      bufferSize = 64,
      // we expect the buffer to never run full and if it does, we want
      // to log that asap, so we use OverflowStrategy.backpressure
      OverflowStrategy.backpressure
    ).toMat(BroadcastHub.sink)(Keep.both).run()

  // This keeps the BroadCastHub drained while there are no subscribers
  // (see https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html ):
  broadCastSource.to(Sink.ignore).run()

  def notificationSource(p: Event => Boolean): Source[Unit, NotUsed] = {
    broadCastSource
      .collect { case event if p(event) => () }
      // this buffer is intended to keep the upstream buffers of
      // queue and hub drained:
      .buffer(
        // if a downstream consumer ever becomes too slow to consume,
        // only the latest two notifications are relevant
        size = 2,
        // doesn't really matter whether we drop head or tail
        // as all elements are the same (), it's just important not
        // to backpressure in case of overflow:
        OverflowStrategy.dropHead
      )
  }

  def propagateEvent(
    event: Event
  ): Unit = {
    queue.offer(event).onComplete {
      case Failure(e) =>
        // unexpected backpressure occurred!
        println(e.getMessage)
        e.printStackTrace()
      case _ =>
        ()
    }
  }

}

由于buffer()的医生说,对于DropHead来说,它永远不会产生反压力,所以我本来以为上游的缓冲仍然会耗尽。然而,由于背压,我最终还是没有接到queue.offer()的电话。

我能想到的理由:

  1. .collect中对谓词p的计算会导致大量的负载,从而导致背压。这似乎不太可能,因为这是非常简单的非阻塞操作。
  2. 整体系统完全超载。也不太可能。

我有种感觉我错过了什么吗?我是否需要在.async之前或之后通过buffer()添加一个异步边界,以便将“集线器”与可能在下游某个地方发生的重负载完全解耦?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-25 12:47:08

因此,在阅读了更多的akka文档和一些实验之后,我想我找到了解决方案(很抱歉,我问得太早了)。

要将我的代码从可能发生在下游某个地方的任何沉重负载中完全分离出来,我需要确保任何下游代码都不是由与.buffer()相同的参与者执行的(例如,插入.async)。

例如,这段代码最终会导致SourceQueue完全运行,然后向后施加压力:

代码语言:javascript
复制
val hub: NotifactionHub[Int] = // ...
hub.notificationSource(_ => true)
  .map { x =>
    Thread.sleep(250)
    x
  }

进一步的检查表明,这个.map()将与上游.collect() (和.buffer())在同一个线程(底层参与者)上执行。

如下面所示,插入.async时,.buffer()将删除元素(正如我所希望的那样),而上游SourceQueue仍会耗尽:

代码语言:javascript
复制
val hub: NotifactionHub[Int] = // ...
hub.notificationSource(_ => true)
  .async
  .map { x =>
    Thread.sleep(250)
    x
  }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73080970

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档