我目前正致力于实现对API的客户端http请求,并决定探索用于此任务的sttp & monix。由于我对Monix还不熟悉,所以我仍然不知道如何运行任务并检索它们的结果。我的目标是获得一系列http请求结果,我可以在并行->解析->加载中调用这些结果。
下面是我到目前为止尝试过的一个片段:
import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
.header("accept", "application/json")
.response(asString)
.body()
.send()
val tasks = Seq(r1).map(i => Task(i))
Task.parSequenceUnordered(tasks).guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
postTask.runToFuture.foreach(println) // prints: List(Task.FlatMap$2052527361)
}我的困惑相当简单(我猜)。如何运行我创建的Task.parSequenceUnordered,并处理(解析http结果)序列中的任务?
很高兴有:出于好奇,在处理任务序列请求时,是否可以天真地引入限速/节流?我并不是真的想要建造复杂的东西。它可以很简单,只需间隔出几批请求。不知道Monix是否已经有了帮手。
发布于 2020-08-05 13:33:36
感谢奥列格·皮日科夫和monix gitter群落帮助我解决这个问题。
这里引用奥列格的话:
由于您已经在使用带有monix支持的后端,所以r1的类型是
Task[Response[Either[String,String]]]。因此,当您执行Seq(r1).map(i => Task(i))时,您将它变成一个任务序列,除了提供给您结果的其他任务(类型为Seq[Task[Task[Response[...]]]])之外,其他任务不会执行任何操作。然后,您的代码将外层、任务--即赋予任务--并行化,然后您将得到作为结果开始的任务。您只需要处理Seq(r1)才能并行运行请求。 如果您正在使用Intellij,您可以按Alt + =键查看选择的类型--如果不能单独区分类型和代码(但通过经验它会变得更好)。 至于速率限制,我们有parSequenceN,允许您对并行性设置一个限制。请注意,无序只意味着以输出中随机顺序的结果为代价获得轻微的性能优势,无论如何,它们都是不确定地执行的。
最后,我得到了一个(简化的)实现,如下所示:
import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
.header("accept", "application/json")
.response(asString)
.body()
.send()
val items = Seq(r1.map(x => x.body))
Task.parSequenceN(1)(items).guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
postTask.runToFuture.foreach(println)
}https://stackoverflow.com/questions/63263276
复制相似问题