首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用并发(akka)、异步API (nio2)向新文件写入输出、读取文件、进程行和写入输出

使用并发(akka)、异步API (nio2)向新文件写入输出、读取文件、进程行和写入输出
EN

Stack Overflow用户
提问于 2014-03-13 15:51:19
回答 3查看 5.7K关注 0票数 4

1:我在处理一个大型文本文件时遇到了一个问题-- 10Gigs+

单线程解决方案如下:

代码语言:javascript
复制
val writer = new PrintWriter(new File(output.getOrElse("output.txt")));
for(line <- scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines())
{
  writer.println(DigestUtils.HMAC_SHA_256(line))
}
writer.close()

2:我尝试使用以下方法进行并发处理

代码语言:javascript
复制
val futures = scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines
               .map{ s => Future{ DigestUtils.HMAC_SHA_256(s) } }.to
val results = futures.map{ Await.result(_, 10000 seconds) }

这会导致GC开销限制超过异常(请参见附录A中的堆栈跟踪)

3:我尝试使用Akka与AsynchronousFileChannel结合使用https://github.com/drexin/akka-io-file,我能够使用FileSlurp以字节块读取文件,但没有找到逐行读取文件的解决方案,这是一个要求。

任何帮助都将不胜感激。谢谢。

附录A

代码语言:javascript
复制
[error] (run-main) java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.nio.CharBuffer.wrap(Unknown Source)
        at sun.nio.cs.StreamDecoder.implRead(Unknown Source)
        at sun.nio.cs.StreamDecoder.read(Unknown Source)
        at java.io.InputStreamReader.read(Unknown Source)
        at java.io.BufferedReader.fill(Unknown Source)
        at java.io.BufferedReader.readLine(Unknown Source)
        at java.io.BufferedReader.readLine(Unknown Source)
        at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.s
cala:67)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:
48)
        at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:7
16)
        at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:6
92)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at com.test.Twitterhashconcurrentcli$.doConcurrent(Twitterhashconcu
rrentcli.scala:35)
        at com.test.Twitterhashconcurrentcli$delayedInit$body.apply(Twitter
hashconcurrentcli.scala:62)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:
12)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.generic.TraversableForwarder$class.foreach(Traversab
leForwarder.scala:32)
        at scala.App$class.main(App.scala:71)
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2014-03-13 17:02:36

这里的诀窍是避免将所有数据同时读取到内存中。如果您迭代并将行发送给工作人员,则会运行此风险,因为发送给一个参与者是异步的,因此您可能会将所有数据读入内存,并且它将位于参与者的邮箱中,可能导致OOM异常。一个更好的高级别方法是使用单个主演员和下面的童工池进行处理。这里的诀窍是在主服务器中对文件使用惰性流(就像从Iterator返回的scala.io.Source.fromX),然后在工作人员中使用work-pulling模式来防止他们的邮箱填充数据。然后,当迭代器不再有任何行时,主程序会停止工作(如果有必要,您也可以使用这个点来关闭参与者系统,如果这是您真正想要做的)。

这是一个非常粗略的大纲。请注意,我还没有测试这个:

代码语言:javascript
复制
import akka.actor._
import akka.routing.RoundRobinLike
import akka.routing.RoundRobinRouter
import scala.io.Source
import akka.routing.Broadcast

object FileReadMaster{
  case class ProcessFile(filePath:String)
  case class ProcessLines(lines:List[String], last:Boolean = false)
  case class LinesProcessed(lines:List[String], last:Boolean = false)

  case object WorkAvailable
  case object GimmeeWork
}

class FileReadMaster extends Actor{
  import FileReadMaster._

  val workChunkSize = 10
  val workersCount = 10

  def receive = waitingToProcess

  def waitingToProcess:Receive = {
    case ProcessFile(path) =>
      val workers = (for(i <- 1 to workersCount) yield context.actorOf(Props[FileReadWorker])).toList
      val workersPool = context.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = workers)))
      val it = Source.fromFile(path).getLines
      workersPool ! Broadcast(WorkAvailable)
      context.become(processing(it, workersPool, workers.size))

      //Setup deathwatch on all
      workers foreach (context watch _)
  }

  def processing(it:Iterator[String], workers:ActorRef, workersRunning:Int):Receive = {
    case ProcessFile(path) => 
      sender ! Status.Failure(new Exception("already processing!!!"))


    case GimmeeWork if it.hasNext =>
      val lines = List.fill(workChunkSize){
        if (it.hasNext) Some(it.next)
        else None
      }.flatten

      sender ! ProcessLines(lines, it.hasNext)

      //If no more lines, broadcast poison pill
      if (!it.hasNext) workers ! Broadcast(PoisonPill)

    case GimmeeWork =>
      //get here if no more work left

    case LinesProcessed(lines, last) =>
      //Do something with the lines

    //Termination for last worker
    case Terminated(ref)  if workersRunning == 1 =>
      //Done with all work, do what you gotta do when done here

    //Terminared for non-last worker
    case Terminated(ref) =>
      context.become(processing(it, workers, workersRunning - 1))

  }
}

class FileReadWorker extends Actor{
  import FileReadMaster._

  def receive = {
    case ProcessLines(lines, last) => 
      sender ! LinesProcessed(lines.map(_.reverse), last)
      sender ! GimmeeWork

    case WorkAvailable =>
      sender ! GimmeeWork
  }
}

其思想是,主服务器遍历文件的内容,并将大块的工作发送给一个子工作者池。当文件处理开始时,主程序会告诉所有子程序工作是可用的。然后,每个孩子继续要求工作,直到没有更多的工作。当主人检测到文件被读取时,它会向孩子们播送一颗毒丸,让他们完成任何出色的工作,然后停止。当所有的孩子都停下来时,主人可以完成任何需要的清理工作。

再一次,这是非常粗糙的基础上,我认为你是问的。如果我不在任何地方,让我知道,我可以修改答案。

票数 14
EN

Stack Overflow用户

发布于 2014-03-13 16:44:14

实际上,在并行变体中,您首先尝试将所有文件作为行列表读入内存,然后获取一个副本(使用方法List.to)。很明显这是OOME的原因。

要并行化,首先要决定是否值得这样做。您不应该并行地从顺序文件(以及写入)读取:这只会导致磁头的过度移动,并使事情变慢。只有当DigestUtils.HMAC_SHA_256(s)所花费的时间比读取一行时间更长时,并行化才有意义。制定一个基准来衡量这两次。然后,如果您认为哈希代码计算的并行化是值得的,请找出工作线程的数量:想法是,经过的计算时间大致等于读取时间。让一个线程读取行,将它们打包成批(例如,一批1000行),并将批放在一个固定大小的ArrayBlockingQueue (例如,1000)中。批处理是必需的,因为队列中有太多的行和太多的同步操作,从而导致争用。让工作线程使用take方法从队列中读取批处理。

另外一个线程应该将结果写入"output.txt",并与阻塞队列连接。如果您必须在输出文件中保持行的顺序,那么应该使用更复杂的通信工具,而不是第二个队列,但这是另一回事。

票数 2
EN

Stack Overflow用户

发布于 2014-03-13 16:44:02

以下代码未经测试:)

映射到未来绝对不是一个好主意。

相反,由于您已经使用了Akka,我会介绍一个特殊的LineProcessor演员,然后发送行给它:

代码语言:javascript
复制
val processor = system.actorOf(Props(new LineProcessor))

val src = scala.io.Source.fromFile(file.getOrElse("data.txt"))

src.getLines.foreach(line => processor ! line)  

在LineProcessor内部,您可以封装逻辑来处理行:

代码语言:javascript
复制
class LineProcessor extends Actor {
  def receive {
    case line => // process the line
  }
}    

这里的诀窍是,对于演员,你可以很容易地进行横向缩放。把一个LineProcessor演员包在路由器里.

代码语言:javascript
复制
// this will create 10 workers to process your lines simultaneously
val processor = system.actorOf(Props(new LineProcessor).withRouter(RoundRobinRouter(10))

值得一提的是,如果您需要在保留顺序的地方写行,它就会变得更加棘手。(当从文件中读取一行时,还需要捕获它的编号,当将它写回时,您需要在所有工人之间进行协调)

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/22383939

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档