首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Akka :处理GraphStage内部的未来

Akka :处理GraphStage内部的未来
EN

Stack Overflow用户
提问于 2021-05-24 16:34:43
回答 2查看 364关注 0票数 1

我正在尝试构建一个Akka ,它通过发出Future API调用来接收数据( API的本质是滚动,它递增地获取结果)。为了构建这样的Source,我正在使用GraphStage

我修改了NumberSource实例,它一次简单地推送一个Int。我所做的唯一改变就是用Int替换为getvalue(): Future[Int] (以模拟API调用):

代码语言:javascript
复制
class NumbersSource extends GraphStage[SourceShape[Int]] {
  val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  // simple example of future API call
  private def getvalue(): Future[Int] = Future.successful(Random.nextInt())


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
            // Future API call
            getvalue().onComplete{
              case Success(value) =>
                println("Pushing value received..") // this is currently being printed just once
                push(out, counter)
              case Failure(exception) =>
            }
          }
        }
      })
    }
}

// Using the Source and Running the stream

  val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
  val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)

  val done: Future[Done] = mySource.runForeach{
    num => println(s"Received: $num") // This is currently not printed
  }
  done.onComplete(_ => system.terminate())

以上代码不起作用。setHandler中的println语句只执行一次,没有任何东西被推到下游。

这类未来电话应如何处理?谢谢。

更新

我试图通过如下更改来使用getAsyncCallback

代码语言:javascript
复制
class NumbersSource(futureNum: Future[Int]) extends GraphStage[SourceShape[Int]] {
  val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      override def preStart(): Unit = {
        val callback = getAsyncCallback[Int] { (_) =>
          completeStage()
        }
        futureNum.foreach(callback.invoke)
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          val value: Int = ??? // How to get this value ??
          push(out, value)
        }
      })
    }
}

// Using the Source and Running the Stream

def random(): Future[Int] = Future.successful(Random.nextInt())

  val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource(random())
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)

  val done: Future[Done] = mySource.runForeach{
    num => println(s"Received: $num") // This is currently not printed
  }
  done.onComplete(_ => system.terminate())

但是,现在我被困在如何获取从未来计算出来的价值。对于GraphStageFlow,我可以使用:

代码语言:javascript
复制
val value = grab(in) // where in is Inlet of a Flow

但是,我有一个GraphStageSource,所以我不知道如何获取上面计算的未来的Int值。

EN

回答 2

Stack Overflow用户

发布于 2021-05-24 19:40:45

我不确定我是否正确理解,但是如果您试图用Future中计算的元素实现无限源,那么就没有必要使用您自己的GraphStage来实现它。您可以简单地如下所示:

代码语言:javascript
复制
Source.repeat(())
    .mapAsync(parallelism) { _ => Future.successful(Random.nextInt()) }

Source.repeat(())只是一些任意值的无限源(在本例中为Unit类型,但您可以将()更改为任何您想要的,因为这里忽略了它)。然后使用mapAsync将异步计算集成到流中。

票数 0
EN

Stack Overflow用户

发布于 2021-06-09 17:12:25

我会加入到另一个答案,试图避免创建您自己的图形阶段。经过一些实验,这似乎对我有用:

代码语言:javascript
复制
type Data = Int

trait DbResponse {
  // this is just a callback for a compact solution
  def nextPage: Option[() => Future[DbResponse]] 
  def data: List[Data]
}

def createSource(dbCall: DbResponse): Source[Data, NotUsed] = {
  val thisPageSource = Source.apply(dbCall.data)
  val nextPageSource = dbCall.nextPage match {
    case Some(dbCallBack) => Source.lazySource(() => Source.future(dbCallBack()).flatMapConcat(createSource))
    case None             => Source.empty
  }
  thisPageSource.concat(nextPageSource)
}

val dataSource: Source[Data, NotUsed] = Source
   .future(???: Future[DbResponse]) // the first db call
   .flatMapConcat(createSource)

我尝试了它,它几乎完美地工作,我找不出原因,但第二页是即时请求,但其余的将如预期的工作(在背压和什么不工作)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67675769

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档