首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何运行Monix的parSequenceUnordered并处理每个任务的结果?

如何运行Monix的parSequenceUnordered并处理每个任务的结果?
EN

Stack Overflow用户
提问于 2020-08-05 10:31:43
回答 1查看 272关注 0票数 2

我目前正致力于实现对API的客户端http请求,并决定探索用于此任务的sttp & monix。由于我对Monix还不熟悉,所以我仍然不知道如何运行任务并检索它们的结果。我的目标是获得一系列http请求结果,我可以在并行->解析->加载中调用这些结果。

下面是我到目前为止尝试过的一个片段:

代码语言:javascript
复制
import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task

object SO extends App {

  val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
      .header("accept", "application/json")
      .response(asString)
      .body()
      .send()

    val tasks = Seq(r1).map(i => Task(i))
    Task.parSequenceUnordered(tasks).guarantee(backend.close())
  }
  
  import monix.execution.Scheduler.Implicits.global

  postTask.runToFuture.foreach(println) // prints: List(Task.FlatMap$2052527361)
}

我的困惑相当简单(我猜)。如何运行我创建的Task.parSequenceUnordered,并处理(解析http结果)序列中的任务?

很高兴有:出于好奇,在处理任务序列请求时,是否可以天真地引入限速/节流?我并不是真的想要建造复杂的东西。它可以很简单,只需间隔出几批请求。不知道Monix是否已经有了帮手。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-05 13:33:36

感谢奥列格·皮日科夫monix gitter群落帮助我解决这个问题。

这里引用奥列格的话:

由于您已经在使用带有monix支持的后端,所以r1的类型是Task[Response[Either[String,String]]]。因此,当您执行Seq(r1).map(i => Task(i))时,您将它变成一个任务序列,除了提供给您结果的其他任务(类型为Seq[Task[Task[Response[...]]]])之外,其他任务不会执行任何操作。然后,您的代码将外层、任务--即赋予任务--并行化,然后您将得到作为结果开始的任务。您只需要处理Seq(r1)才能并行运行请求。 如果您正在使用Intellij,您可以按Alt + =键查看选择的类型--如果不能单独区分类型和代码(但通过经验它会变得更好)。 至于速率限制,我们有parSequenceN,允许您对并行性设置一个限制。请注意,无序只意味着以输出中随机顺序的结果为代价获得轻微的性能优势,无论如何,它们都是不确定地执行的。

最后,我得到了一个(简化的)实现,如下所示:

代码语言:javascript
复制
import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task

object SO extends App {

  val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
      .header("accept", "application/json")
      .response(asString)
      .body()
      .send()

    val items = Seq(r1.map(x => x.body))
    Task.parSequenceN(1)(items).guarantee(backend.close())
  }
  
  import monix.execution.Scheduler.Implicits.global

   postTask.runToFuture.foreach(println)
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63263276

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档