首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何通过Monix发送HTTP请求?

如何通过Monix发送HTTP请求?
EN

Stack Overflow用户
提问于 2020-08-08 17:08:59
回答 1查看 309关注 0票数 1

在我早期的问题的基础上,并根据阿特姆的见解,我的目标是将get请求发送到给定的url,并使用Monix的节流特性来划分请求(以避免访问速率限制)。

预期的工作流如下所示:

代码语言:javascript
复制
make 1 (or more) api call(s) -> apply back-pressure/pausing (based on throttle) -> make the next request -> so on and so forth..

到目前为止,我已经尝试过这样做(下面是我实际代码的一个简化片段):

代码语言:javascript
复制
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt

object ObservableTest extends App {

  val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val ids: Task[List[Int]] = Task { (1 to 3).toList }
    val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
    val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
    data.guarantee(backend.close())
  }

  import monix.execution.Scheduler.Implicits.global

  val flat: Unit = activities.runToFuture.foreach { x =>
    val r: List[Task[Response[Either[String, String]]]] = x // List with size 3
    Observable
      .fromIterable(r)
      .throttle(6 second, 1)
      .map(_.runToFuture)
      .subscribe()
  }
  while (true) {}
}

获取数据的函数是这样的:

代码语言:javascript
复制
  def fetch(uri: Uri, auth: String)(implicit
      backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
  ) = {
    println(uri)
    val task = basicRequest
      .get(uri)
      .header("accept", "application/json")
      .header("Authorization", auth)
      .response(asString)
      .send()

    task
  }

我尝试过运行上述代码,我仍然看到所有get请求都是在没有间隔的情况下触发的。

为了举例说明,我当前的api调用日志如下所示:

代码语言:javascript
复制
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)

我正努力实现类似的目标:

代码语言:javascript
复制
//(https://mr.foos.api/v1), Sat Aug 08 18:50:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:18 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:21 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:24 CEST 2020)

更新:

  • 我已经将api设置为可以使用采煤机进行模拟。在我看来,print语句是从调用函数生成的,但是请求并不是实际发送的。我还更新了函数调用,使其解析为字符串(只是为了简单起见),但是,当我试图将请求限制到模拟api时,它仍然没有接收到任何请求。
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-08 18:36:09

所以,如果我理解得对,你有这样的类型:

代码语言:javascript
复制
object ObservableTest extends App  {
  type Response = Either[ResponseError[Error], Activities]
  case class Activities()
  val activities: Task[List[Response]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    def fetchData(uri: String): Task[Response] = ???
    val ids: Task[List[Int]] = ??? // a Task containing a List of IDs (from a previous step)
    val func: String => Task[Response] = (i: String) => fetchData("someUri") // a function that will be used to call an endpoint
    val data: Task[List[Task[Response]]] = ids map (_ map (id => func(id.toString))) // Maps API calling-function to the ids
    val activitiesData: Task[List[Response]] = data.flatMap(Task.parSequenceUnordered(_)) // Flattenned the previous step
    activitiesData.guarantee(backend.close())
  }
  import monix.execution.Scheduler.Implicits.global
  Observable(activities)
    .throttle(3 second, 1)
    .subscribe()
}

代码中存在的问题是,您控制了一个包含多个操作的大任务,其中一些操作甚至是并行的(但这不是问题的根源)。即使在类型中,您也可以看到--您应该从任务列表中观察到(每个任务都是节流的),而不是列表中的任务。

实际上,我不知道it来自何处,它可能是评估管道的基石。但是如果我们和他们有任务,就像在例子中一样。我们会这么做的。

代码语言:javascript
复制
import monix.eval.Task
import sttp.client.asynchttpclient.monix._
import monix.eval.Task._
import monix.reactive.Observable
import sttp.client.ResponseError

import scala.concurrent.duration.DurationInt

object ObservableTest extends App  {
  type Response = Either[ResponseError[Error], Activity]
  case class Activity()
  val activities: Task[List[Task[Response]]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    def fetchData(uri: String): Task[Response] = Task {
      println("mocked http request")
      Right(Activity())
    }
    val ids: Task[List[Int]] = Task { (1 to 100).toList} // a Task containing a List of IDs (from a previous step)
    val func: Int => Task[Response] = (i: Int) => fetchData(s"someUri_$i") // a function that will be used to call an endpoint
    val data: Task[List[Task[Response]]] = ids.map(_.map(func)) // Maps API calling-function to the ids
    data.guarantee(backend.close())
  }
  import monix.execution.Scheduler.Implicits.global

  Observable.fromTask(activities)
    .flatMap { listOfFetches: List[Task[Response]]  =>
      Observable.fromIterable(listOfFetches)
    }
    .throttle(3.second, 1)
    .map(_.runToFuture) 
    .subscribe()
  
  while(true) {}
}

我们节流了一个抓取的列表,而不是执行所有在里面提取的任务。

PS:请问一些不清楚的问题,我会在代码中添加评论。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63318135

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档