首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Scala中的并发处理

Scala中的并发处理
EN

Stack Overflow用户
提问于 2009-06-17 14:26:00
回答 5查看 2.3K关注 0票数 8

作为在我对自己的问题的回答中,我的情况是,我正在处理大量到达队列的事件。每个事件都以完全相同的方式处理,每个事件甚至可以独立于所有其他事件来处理。

我的程序利用了Scala并发框架,所涉及的许多流程都被建模为Actor。当Actor按顺序处理它们的消息时,它们不太适合这个特定的问题(尽管我的其他参与者正在执行顺序性的操作)。由于我希望Scala“控制”所有线程的创建(首先,我认为它有一个并发系统的意义),所以我似乎有两个选择:

  1. 将事件发送到我控制的事件处理器池中。
  2. 让我的Actor通过其他机制并发地处理它们

我认为#1否定了使用参与者子系统的意义:我应该创建多少处理器参与者?是个显而易见的问题。这些东西被认为是隐藏在我面前,并由子系统解决。

我的答案是:

代码语言:javascript
复制
val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

有没有更好的方法?这不对吗?

编辑:一个可能更好的方法是:

代码语言:javascript
复制
val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2009-06-18 00:56:23

这似乎是对另一个问题的重复。所以我会重复我的答案

演员一次只处理一条消息。处理多条消息的典型模式是有一个协调器参与者前端,用于一个使用者参与者池。如果使用react,那么使用者池可以很大,但仍然只使用少量JVM线程。下面是一个示例,其中我创建了一个由10个消费者组成的池,并为他们创建了一个协调员。

代码语言:javascript
复制
import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

此代码测试哪些使用者可用,并向该使用者发送请求。替代方案只是随机分配给使用者,或者使用循环调度程序。

根据您正在做的事情,Scala的期货可能会为您提供更好的服务。例如,如果你真的不需要演员,那么上面所有的机器都可以写成

代码语言:javascript
复制
import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))
票数 8
EN

Stack Overflow用户

发布于 2009-06-17 14:56:37

如果所有事件都可以独立处理,为什么它们都在队列中?对你的设计一无所知,这似乎是一个不必要的步骤。如果您可以使用触发这些事件的任何内容组合process函数,则可能会排除队列。

演员本质上是一种并行效果,它配备了一个队列。如果你想同时处理多条消息,你不需要一个演员。您只希望一个函数(任何=> ())在某个方便的时间被安排执行。

尽管如此,如果您想留在参与者库中,如果事件队列不在您的控制范围内,那么您的方法是合理的

斯卡拉兹区分了行为者和并发效应。虽然它的Actor很轻,但scalaz.concurrent.Effect更轻.下面是粗略翻译到Scalaz库的代码:

代码语言:javascript
复制
val eventProcessor = effect (x => process x)

这是最新的后备箱头,还没有发布。

票数 3
EN

Stack Overflow用户

发布于 2009-06-17 15:21:28

这听起来像是一个简单的消费者/生产者问题。我会用队列和一群消费者。您可能可以使用java.util.concurrent编写几行代码。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/1007371

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档