首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用scalaz-stream作为异步计算的实时编写器

使用scalaz-stream作为异步计算的实时编写器
EN

Stack Overflow用户
提问于 2014-08-26 07:33:07
回答 1查看 369关注 0票数 1

我有一个网络应用程序,它做了一堆缓慢的并发工作来计算其结果。我不想让最终用户挂起,而是想通过websocket回传进度更新。

我的代码库是由Scalaz (/)组成的,如下所示:

代码语言:javascript
复制
type ProcessResult = Error \/ Int

def downloadFile(url: String): Future[Error \/ String] = ???
def doSlowProcessing(data1: String, data2: String): Future[ProcessResult] = ???

/* Very simple however doesn't give any progress update */
def execute(): Future[ProcessResult] = {
 val download1 = downloadFile(...)
 val download2 = downloadFile(...)

 val et = for {
   d1 <- download1
   d2 <- download2
   processed <- doSlowProcessing(d1, d2)
 } yield processed   

 et.run 
}

这工作得很好,但当然,在我从Future中得到任何东西之前,需要完成整个计算。即使我堆叠在Writer monad上做日志记录,我也只能在日志完成后才能获得日志,这并不会让我的最终用户更满意。

我试着在代码运行时使用scalaz-stream队列发送日志作为副作用,但最终结果相当丑陋:

代码语言:javascript
复制
def execute(): Process[Task, String \/ ProcessResult] = {
 val (q, src) = async.queue[String \/ ProcessResult]

 val download1 = downloadFile(...)
 val download2 = downloadFile(...)

 val et = for {
   d1 <- q.enqueue("Downloading 1".left); download1
   d2 <- q.enqueue("Downloading 2".left); download2
   processed <- q.enqueue("Doing processing".left); doSlowProcessing(d1, d2)
 } yield processed    

 et.run.onSuccess {
  x =>
   q.enqueue(x.right)
   q.close
 }

 src
}

感觉应该有一种惯用的方式来实现这一点?如有必要,可以将我的SIP-14 Scala未来转换为任务。

EN

回答 1

Stack Overflow用户

发布于 2014-08-27 09:30:36

我不认为你需要使用队列,其中一种方法是使用wye的非确定性合并,即

代码语言:javascript
复制
type Result = ???
val download1: Process[Task,File] = ???
val download2: Process[Task,File] = ???


val result: Process[Task,(File,File)] = (download1 yip download2).once 

val processed: Process[Task, Result] = result.flatMap(doSlowProcessing)

// Run asynchronously, 
processed.runLast.runAsync {
  case Some(r) => .... // result computed
  case None => .... //no result, hence download1,2 were empty.
}

//or run synchronously awaiting the result
processed.runLast.run match {
  case Some(r) => .... // result computed
  case None => .... //no result 
}

//to capture the error information while download use 
val withError: Process[Task,Throwable\/File] = download1.attempt

//or to log and recover to other file download
val withError: Process[Task,File] download1 onFailure { err => Log(err); download3 }

这说得通吗?

还请注意,为了支持async.unboundedQueue,从0.5.0开始就弃用了async.queue

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25495857

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档