首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从并行任务向单个输出流写入Monix

从并行任务向单个输出流写入Monix
EN

Stack Overflow用户
提问于 2019-06-20 03:31:10
回答 1查看 296关注 0票数 0

如何使用Monix任务从一系列任务(可以并行运行)编写单个输出流:例如,我有N个任务可以并行运行,并从HTTP call/server,获得一些响应,我正在将响应写回文件流(就像有限制内存一样),但我得到stream closed error,想知道是否有一种方法可以使用Monix任务或任何其他API来实现这一目标。

代码语言:javascript
复制
java.io.IOException: Stream closed  

示例代码片段:

代码语言:javascript
复制
 import java.io.{FileOutputStream, OutputStream, OutputStreamWriter}

    import monix.eval.Task
    import monix.execution.ExecutionModel.AlwaysAsyncExecution
    import monix.execution.Scheduler

    import scala.util.Random

    object TaskTest extends App {

      implicit val scheduler = Scheduler(
        AlwaysAsyncExecution
      )
      val list = List(1 to 100)

      val filePath = System.currentTimeMillis() + "_test.txt"
      val outputStreamWriter = new OutputStreamWriter(new FileOutputStream(filePath))
      val futureTask = autoClose(outputStreamWriter) {
        outputStream =>
          val totalCount = SeqOfTasks(outputStream)
          totalCount.map(_.sum)
      }

      futureTask.runToFuture.onComplete {
        case scala.util.Success(value) =>
          println(s"fetched a total of $value")

        case scala.util.Failure(exception) =>

          println("there is some error occurred ")
          println(exception)
      }

//close this stream when all tasks completed.
      def autoClose[A <: AutoCloseable, B](resource: A)(code: A ⇒ B): B = {
        try code(resource)
        finally resource.close()
      }


      def SeqOfTasks(outPutStream: OutputStreamWriter): Task[Seq[Int]] = {

        val list = List(0 to 10: _*)
        Task.wanderUnordered(list)(l => makeHttpCall(outPutStream, l))
      }

      // each taks will fetch some data and write to single out put stream that is shared by other tasks.
      def makeHttpCall(outputStream: OutputStreamWriter, value: Int): Task[Int] = {

        Task {
          val result = getDummyValue(value)
          val bytes = result.mkString("\t") + "\n"
          outputStream.write(bytes)
          //return some value
          result.size
        }
      }

      //return a dummy response for http
      def getDummyValue(n: Int): Seq[Int] = {
        println(s"dummy value : $n")
        Seq.fill(n)(Random.nextInt)
      }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-24 15:25:02

这个问题是由于Task是延迟和异步的,但是autoClose两者都不是。

解决方案是使autoClose任务意识到:

代码语言:javascript
复制
def autoClose[A <: AutoCloseable, B](resource: A)(code: A ⇒ Task[B]): Task[B] = {
    code(resource).guarantee(Task(resource.close()))
  }

注意:这是由 奥列格·皮日科夫 on Monix Gitter聊天回答的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56678588

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档