首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Continuations拆分和分派异步控制流?

如何使用Continuations拆分和分派异步控制流?
EN

Stack Overflow用户
提问于 2010-03-15 19:41:25
回答 3查看 1K关注 0票数 13

我有一个异步控制流,如下所示:

代码语言:javascript
复制
ActorA ! DoA(dataA, callback1, callbackOnErrorA)

def callback1() = {
  ...
  ActorB ! DoB(dataB, callback2, callbackOnErrorB)
}

def callback2() = {
  ActorC ! DoC(dataC, callback3, callbackOnErrorC)
} 

...

我如何将这个流程分成几个部分(延续),并在维护整体状态的同时,将这些部分顺序地分派给不同的参与者(或线程/任务)?

任何提示都很感谢,谢谢

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2010-03-16 06:56:28

这非常简单,但展示了如何在三个参与者之间拆分单个控制流,将状态传递给每个参与者:

代码语言:javascript
复制
package blevins.example

import scala.continuations._
import scala.continuations.ControlContext._
import scala.actors.Actor._
import scala.actors._

object App extends Application {

  val actorA, actorB, actorC = actor {
    receive {
      case f: Function1[Unit,Unit] => { f() }
    }
  }

  def handle(a: Actor) = shift { k: (Unit=>Unit) =>
    a ! k
  }

  // Control flow to split up
  reset {
      // this is not handled by any actor
      var x = 1
      println("a: " + x)

      handle(actorA)  // actorA handles the below
      x += 4
      println("b: " + x)

      handle(actorB) // then, actorB handles the rest
      var y = 2
      x += 2
      println("c: " + x)

      handle(actorC) // and so on...
      y += 1
      println("d: " + x + ":" + y)
  }

}
票数 7
EN

Stack Overflow用户

发布于 2010-03-15 22:05:36

我喜欢使用scalaz.concurrent.Promise。这个例子与你问题中的例子并不完全一样,但它给了你一些启发。

代码语言:javascript
复制
object Async extends Application {
  import scalaz._
  import Scalaz._
  import concurrent._
  import concurrent.strategy._
  import java.util.concurrent.{ExecutorService, Executors}

  case class ResultA(resultb: ResultB, resulta: ResultC)
  case class ResultB()
  case class ResultC()

  run

  def run {
    implicit val executor: ExecutorService = Executors.newFixedThreadPool(8)
    import Executor.strategy

    val promiseA = doA
    println("waiting for results")
    val a: ResultA = promiseA.get
    println("got " + a)
    executor.shutdown    
  }

  def doA(implicit s: Strategy[Unit]): Promise[ResultA] = {
    println("triggered A")
    val b = doB
    val c = doC
    for {bb <- b; cc <- c} yield ResultA(bb, cc)
  }

  def doB(implicit s: Strategy[Unit]): Promise[ResultB] = {
    println("triggered B")
    promise { Thread.sleep(1000); println("returning B"); ResultB() }
  }

  def doC(implicit s: Strategy[Unit]): Promise[ResultC] = {
    println("triggered C")
    promise { Thread.sleep(1000); println("returning C"); ResultC() }
  }
}

输出:

代码语言:javascript
复制
triggered A
triggered B
triggered C
waiting for results
returning B
returning C
got ResultA(ResultB(),ResultC())

您可以在Runar的这个presentation中找到对Scalaz并发性的介绍。

这种方法不像Actors那样灵活,但组合得更好,不会死锁。

票数 9
EN

Stack Overflow用户

发布于 2011-09-08 02:26:52

Akka's Futures and how to compose themscalaz's Promises,它们几乎是一样的,只有细微的差别。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/2446770

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档