首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >发现背压正在发生

发现背压正在发生
EN

Stack Overflow用户
提问于 2020-04-13 14:36:40
回答 1查看 99关注 0票数 0

我的Akka HTTP应用程序通过服务器发送的事件流一些数据,客户端可以请求的事件比他们所能处理的多得多。代码看起来像这样

代码语言:javascript
复制
complete {
  source.filter(predicate.isMatch)
   .buffer(1000, OverflowStrategy.dropTail)
   .throttle(20, 1 second)
   .map { evt => ServerSentEvent(evt) }
}

是否有一种方法可以检测到某个阶段的背压,并以某种方式通知客户端,最好是使用相同的接收器(通过发出一种不同的输出),或者如果不可能,只需让Akka框架调用某种类型的回调,通过控制侧通道来处理这个事实?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-04-14 13:00:39

所以,我不太明白你的用例。你是在问.buffer还是.throttle的背压?我的另一部分困惑是,在流已经被压回的情况下,您建议释放一个新的“控制”元素。因此您的控制元素可能会在一段时间内不被接收。另外,如果您每次收到背压时都会发出一个控制元素,那么您很可能会产生大量的控制元素。

构建这一(过于天真)解决方案的一种方法是使用混和

代码语言:javascript
复制
  val simpleSink: Sink[String, Future[Done]] =
    Sink.foreach(e => println(s"simple: $e"))

  val cycleSource: Source[String, NotUsed] =
    Source.cycle(() => List("1", "2", "3", "4").iterator).throttle(5, 1.second)

  val conflateFlow: Flow[String, String, NotUsed] =
    Flow[String].conflate((a, b) => {
      "BACKPRESSURE CONTROL ELEMENT"
    })

  val backpressureFlow: Flow[String, String, NotUsed] =
    Flow[String]
      .buffer(10, OverflowStrategy.backpressure) throttle (2, 1.second)

  val backpressureTest =
    cycleSource.via(conflateFlow).via(backpressureFlow).to(simpleSink).run()

要将其转换为更有用的示例,您可以这样做:

  1. .conflate中进行某种类型的调用(然后删除其中一个元素)。但是要小心,不要做任何阻塞的事情。也许只是发送一个信息,这可能会被其他地方复制。
  2. 编写一个自定义图阶段。做这样简单的事情不会太困难。

不过,我想我必须对用例有更多的了解。看看所有现成的背压感知算子,看看其中一个是否有帮助。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61190218

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档