作为在我对自己的问题的回答中,我的情况是,我正在处理大量到达队列的事件。每个事件都以完全相同的方式处理,每个事件甚至可以独立于所有其他事件来处理。
我的程序利用了Scala并发框架,所涉及的许多流程都被建模为Actor。当Actor按顺序处理它们的消息时,它们不太适合这个特定的问题(尽管我的其他参与者正在执行顺序性的操作)。由于我希望Scala“控制”所有线程的创建(首先,我认为它有一个并发系统的意义),所以我似乎有两个选择:
Actor通过其他机制并发地处理它们我认为#1否定了使用参与者子系统的意义:我应该创建多少处理器参与者?是个显而易见的问题。这些东西被认为是隐藏在我面前,并由子系统解决。
我的答案是:
val eventProcessor = actor {
loop {
react {
case MyEvent(x) =>
//I want to be able to handle multiple events at the same time
//create a new actor to handle it
actor {
//processing code here
process(x)
}
}
}
}有没有更好的方法?这不对吗?
编辑:一个可能更好的方法是:
val eventProcessor = actor {
loop {
react {
case MyEvent(x) =>
//Pass processing to the underlying ForkJoin framework
Scheduler.execute(process(e))
}
}
}发布于 2009-06-18 00:56:23
这似乎是对另一个问题的重复。所以我会重复我的答案
演员一次只处理一条消息。处理多条消息的典型模式是有一个协调器参与者前端,用于一个使用者参与者池。如果使用react,那么使用者池可以很大,但仍然只使用少量JVM线程。下面是一个示例,其中我创建了一个由10个消费者组成的池,并为他们创建了一个协调员。
import scala.actors.Actor
import scala.actors.Actor._
case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop
def consumer(n : Int) = actor {
loop {
react {
case Ready(sender) =>
sender ! Ready(self)
case Request(sender, payload) =>
println("request to consumer " + n + " with " + payload)
// some silly computation so the process takes awhile
val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
sender ! Result(result)
println("consumer " + n + " is done processing " + result )
case Stop => exit
}
}
}
// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)
val coordinator = actor {
loop {
react {
case msg @ Request(sender, payload) =>
consumers foreach {_ ! Ready(self)}
react {
// send the request to the first available consumer
case Ready(consumer) => consumer ! msg
}
case Stop =>
consumers foreach {_ ! Stop}
exit
}
}
}
// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)此代码测试哪些使用者可用,并向该使用者发送请求。替代方案只是随机分配给使用者,或者使用循环调度程序。
根据您正在做的事情,Scala的期货可能会为您提供更好的服务。例如,如果你真的不需要演员,那么上面所有的机器都可以写成
import scala.actors.Futures._
def transform(payload : String) = {
val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
println("transformed " + payload + " to " + result )
result
}
val results = for (i <- 0 to 1000) yield future(transform(i.toString))发布于 2009-06-17 14:56:37
如果所有事件都可以独立处理,为什么它们都在队列中?对你的设计一无所知,这似乎是一个不必要的步骤。如果您可以使用触发这些事件的任何内容组合process函数,则可能会排除队列。
演员本质上是一种并行效果,它配备了一个队列。如果你想同时处理多条消息,你不需要一个演员。您只希望一个函数(任何=> ())在某个方便的时间被安排执行。
尽管如此,如果您想留在参与者库中,如果事件队列不在您的控制范围内,那么您的方法是合理的。
斯卡拉兹区分了行为者和并发效应。虽然它的Actor很轻,但scalaz.concurrent.Effect更轻.下面是粗略翻译到Scalaz库的代码:
val eventProcessor = effect (x => process x)这是最新的后备箱头,还没有发布。
发布于 2009-06-17 15:21:28
这听起来像是一个简单的消费者/生产者问题。我会用队列和一群消费者。您可能可以使用java.util.concurrent编写几行代码。
https://stackoverflow.com/questions/1007371
复制相似问题