我有一个网络应用程序,它做了一堆缓慢的并发工作来计算其结果。我不想让最终用户挂起,而是想通过websocket回传进度更新。
我的代码库是由Scalaz (/)组成的,如下所示:
type ProcessResult = Error \/ Int
def downloadFile(url: String): Future[Error \/ String] = ???
def doSlowProcessing(data1: String, data2: String): Future[ProcessResult] = ???
/* Very simple however doesn't give any progress update */
def execute(): Future[ProcessResult] = {
val download1 = downloadFile(...)
val download2 = downloadFile(...)
val et = for {
d1 <- download1
d2 <- download2
processed <- doSlowProcessing(d1, d2)
} yield processed
et.run
}这工作得很好,但当然,在我从Future中得到任何东西之前,需要完成整个计算。即使我堆叠在Writer monad上做日志记录,我也只能在日志完成后才能获得日志,这并不会让我的最终用户更满意。
我试着在代码运行时使用scalaz-stream队列发送日志作为副作用,但最终结果相当丑陋:
def execute(): Process[Task, String \/ ProcessResult] = {
val (q, src) = async.queue[String \/ ProcessResult]
val download1 = downloadFile(...)
val download2 = downloadFile(...)
val et = for {
d1 <- q.enqueue("Downloading 1".left); download1
d2 <- q.enqueue("Downloading 2".left); download2
processed <- q.enqueue("Doing processing".left); doSlowProcessing(d1, d2)
} yield processed
et.run.onSuccess {
x =>
q.enqueue(x.right)
q.close
}
src
}感觉应该有一种惯用的方式来实现这一点?如有必要,可以将我的SIP-14 Scala未来转换为任务。
发布于 2014-08-27 09:30:36
我不认为你需要使用队列,其中一种方法是使用wye的非确定性合并,即
type Result = ???
val download1: Process[Task,File] = ???
val download2: Process[Task,File] = ???
val result: Process[Task,(File,File)] = (download1 yip download2).once
val processed: Process[Task, Result] = result.flatMap(doSlowProcessing)
// Run asynchronously,
processed.runLast.runAsync {
case Some(r) => .... // result computed
case None => .... //no result, hence download1,2 were empty.
}
//or run synchronously awaiting the result
processed.runLast.run match {
case Some(r) => .... // result computed
case None => .... //no result
}
//to capture the error information while download use
val withError: Process[Task,Throwable\/File] = download1.attempt
//or to log and recover to other file download
val withError: Process[Task,File] download1 onFailure { err => Log(err); download3 }这说得通吗?
还请注意,为了支持async.unboundedQueue,从0.5.0开始就弃用了async.queue
https://stackoverflow.com/questions/25495857
复制相似问题