我正在尝试构建一个Akka ,它通过发出Future API调用来接收数据( API的本质是滚动,它递增地获取结果)。为了构建这样的Source,我正在使用GraphStage。
我修改了NumberSource实例,它一次简单地推送一个Int。我所做的唯一改变就是用Int替换为getvalue(): Future[Int] (以模拟API调用):
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
// simple example of future API call
private def getvalue(): Future[Int] = Future.successful(Random.nextInt())
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
// Future API call
getvalue().onComplete{
case Success(value) =>
println("Pushing value received..") // this is currently being printed just once
push(out, counter)
case Failure(exception) =>
}
}
}
})
}
}
// Using the Source and Running the stream
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val done: Future[Done] = mySource.runForeach{
num => println(s"Received: $num") // This is currently not printed
}
done.onComplete(_ => system.terminate())以上代码不起作用。setHandler中的println语句只执行一次,没有任何东西被推到下游。
这类未来电话应如何处理?谢谢。
更新
我试图通过如下更改来使用getAsyncCallback:
class NumbersSource(futureNum: Future[Int]) extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
override def preStart(): Unit = {
val callback = getAsyncCallback[Int] { (_) =>
completeStage()
}
futureNum.foreach(callback.invoke)
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val value: Int = ??? // How to get this value ??
push(out, value)
}
})
}
}
// Using the Source and Running the Stream
def random(): Future[Int] = Future.successful(Random.nextInt())
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource(random())
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val done: Future[Done] = mySource.runForeach{
num => println(s"Received: $num") // This is currently not printed
}
done.onComplete(_ => system.terminate())但是,现在我被困在如何获取从未来计算出来的价值。对于GraphStage,Flow,我可以使用:
val value = grab(in) // where in is Inlet of a Flow但是,我有一个GraphStage,Source,所以我不知道如何获取上面计算的未来的Int值。
发布于 2021-05-24 19:40:45
我不确定我是否正确理解,但是如果您试图用Future中计算的元素实现无限源,那么就没有必要使用您自己的GraphStage来实现它。您可以简单地如下所示:
Source.repeat(())
.mapAsync(parallelism) { _ => Future.successful(Random.nextInt()) }Source.repeat(())只是一些任意值的无限源(在本例中为Unit类型,但您可以将()更改为任何您想要的,因为这里忽略了它)。然后使用mapAsync将异步计算集成到流中。
发布于 2021-06-09 17:12:25
我会加入到另一个答案,试图避免创建您自己的图形阶段。经过一些实验,这似乎对我有用:
type Data = Int
trait DbResponse {
// this is just a callback for a compact solution
def nextPage: Option[() => Future[DbResponse]]
def data: List[Data]
}
def createSource(dbCall: DbResponse): Source[Data, NotUsed] = {
val thisPageSource = Source.apply(dbCall.data)
val nextPageSource = dbCall.nextPage match {
case Some(dbCallBack) => Source.lazySource(() => Source.future(dbCallBack()).flatMapConcat(createSource))
case None => Source.empty
}
thisPageSource.concat(nextPageSource)
}
val dataSource: Source[Data, NotUsed] = Source
.future(???: Future[DbResponse]) // the first db call
.flatMapConcat(createSource)我尝试了它,它几乎完美地工作,我找不出原因,但第二页是即时请求,但其余的将如预期的工作(在背压和什么不工作)。
https://stackoverflow.com/questions/67675769
复制相似问题