如何使用Monix任务从一系列任务(可以并行运行)编写单个输出流:例如,我有N个任务可以并行运行,并从HTTP call/server,获得一些响应,我正在将响应写回文件流(就像有限制内存一样),但我得到stream closed error,想知道是否有一种方法可以使用Monix任务或任何其他API来实现这一目标。
java.io.IOException: Stream closed 示例代码片段:
import java.io.{FileOutputStream, OutputStream, OutputStreamWriter}
import monix.eval.Task
import monix.execution.ExecutionModel.AlwaysAsyncExecution
import monix.execution.Scheduler
import scala.util.Random
object TaskTest extends App {
implicit val scheduler = Scheduler(
AlwaysAsyncExecution
)
val list = List(1 to 100)
val filePath = System.currentTimeMillis() + "_test.txt"
val outputStreamWriter = new OutputStreamWriter(new FileOutputStream(filePath))
val futureTask = autoClose(outputStreamWriter) {
outputStream =>
val totalCount = SeqOfTasks(outputStream)
totalCount.map(_.sum)
}
futureTask.runToFuture.onComplete {
case scala.util.Success(value) =>
println(s"fetched a total of $value")
case scala.util.Failure(exception) =>
println("there is some error occurred ")
println(exception)
}
//close this stream when all tasks completed.
def autoClose[A <: AutoCloseable, B](resource: A)(code: A ⇒ B): B = {
try code(resource)
finally resource.close()
}
def SeqOfTasks(outPutStream: OutputStreamWriter): Task[Seq[Int]] = {
val list = List(0 to 10: _*)
Task.wanderUnordered(list)(l => makeHttpCall(outPutStream, l))
}
// each taks will fetch some data and write to single out put stream that is shared by other tasks.
def makeHttpCall(outputStream: OutputStreamWriter, value: Int): Task[Int] = {
Task {
val result = getDummyValue(value)
val bytes = result.mkString("\t") + "\n"
outputStream.write(bytes)
//return some value
result.size
}
}
//return a dummy response for http
def getDummyValue(n: Int): Seq[Int] = {
println(s"dummy value : $n")
Seq.fill(n)(Random.nextInt)
}发布于 2019-06-24 15:25:02
这个问题是由于Task是延迟和异步的,但是autoClose两者都不是。
解决方案是使autoClose任务意识到:
def autoClose[A <: AutoCloseable, B](resource: A)(code: A ⇒ Task[B]): Task[B] = {
code(resource).guarantee(Task(resource.close()))
}注意:这是由 奥列格·皮日科夫 on Monix Gitter聊天回答的。
https://stackoverflow.com/questions/56678588
复制相似问题