我目前正在学习和玩STTP使用Monix后端。在处理完所有请求(每个请求都是一项任务)之后,我主要是关闭后端。
我创建了示例/模拟代码以类似于我的问题(据我理解,我的问题更一般,而不是特定于我的代码):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
// I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
}
import monix.execution.Scheduler.Implicits.global
val obs = Observable
.fromTask(activities)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.map(_.runToFuture)
obs.subscribe()
}我的fetch (api调用生成器)函数看起来如下所示:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}由于我的主要任务包含其他任务(稍后我需要处理这些任务),我需要找到一种从外部关闭Monix后端的替代方法。在我使用List[Task[Response[Either[String, String]]]]中的请求之后,是否有一种干净的方法来关闭后端?
发布于 2020-08-11 07:42:47
问题来自这样一个事实:在sttp后端打开时,您正在计算要执行的任务列表-- List[Task[Response[Either[String, String]]]],但您没有运行它们。因此,我们需要在后端关闭之前对运行这些任务进行排序。
这里要做的关键是创建一个任务的单一描述,在后端仍处于打开状态时运行所有这些请求。
一旦计算了data (它本身就是一个任务--一个计算的描述--运行时生成一个任务列表--也是计算的描述),我们需要将它转换为一个单一的、非嵌套的Task。这可以通过多种方式完成(例如,使用简单的排序),但在您的例子中,这将使用Observable
AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] =
ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
val activities = Observable
.fromTask(data)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.mapEval(identity)
.completedL
activities.guarantee(
backend.close()
)
}首先请注意,Observable.fromTask(...)位于最外层的flatMap中,所以是在后端仍然打开时创建的。我们创建可观察的、节流等,然后是一个关键的事实:一旦我们有了节流,我们就会使用mapEval评估每一项(每一项都是Task[...] --一种如何发送na http请求的描述)。我们得到一个Either[String, String]流,这是请求的结果。
最后,我们使用Task (丢弃结果)将流转换为.completedL,直到整个流完成。
然后关闭后端对最后一项任务进行排序。如上文所述,将发生的副作用的最后顺序是:
backend
data)
data
)
https://stackoverflow.com/questions/63329748
复制相似问题