我需要处理一个“大”文件(不适合内存)。
我想对数据进行分批处理。例如,我想将它们插入到数据库中。但是,由于它太大,无法容纳在内存中,它也太慢,处理元素一个接一个。
所以我喜欢从一个Iterator[Something]到一个Iterator[Iterable[Something]]到批处理元素。
从以下几点开始:
CSVReader.open(new File("big_file"))
.iteratorWithHeaders
.map(Something.parse)
.foreach(Jdbi.insertSomething)我可以用可变的序列在foreach语句中做一些脏的事情,并刷新每一个x元素,但是我确信有一种更明智的方法。
// Yuk... :-(
val buffer = ArrayBuffer[Something]()
CSVReader.open(new File("big_file"))
.iteratorWithHeaders
.map(Something.parse)
.foreach {
something =>
buffer.append(something)
if (buffer.size == 1000) {
Jdbi.insertSomethings(buffer.toList)
buffer.clear()
}
}
Jdbi.insertSomethings(buffer.toList)发布于 2015-10-24 16:18:02
如果您的批处理可以具有固定的大小(如您的示例中所示),那么Scala的grouped方法的Iterator将完全满足您的需要:
val iterator = Iterator.continually(1)
iterator.grouped(10000).foreach(xs => println(xs.size))这将在恒定的内存中运行(当然,不包括终端存储在内存中的任何文本)。
我不知道您的iteratorWithHeaders返回了什么,但是如果它是Java,您可以将它转换成像这样的Scala:
import scala.collection.JavaConverters.
val myScalaIterator: Iterator[Int] = myJavaIterator.asScala这将保持适当的懒惰。
发布于 2015-10-24 16:20:03
如果我正确地解决了您的问题,您可以只使用Iterator.grouped。因此,稍微修改一下您的示例:
val si: Iterator[Something] = CSVReader.open(new File("big_file"))
.iteratorWithHeaders
.map(Something.parse)
val gsi: GroupedIterator[Something] = si.grouped(1000)
gsi.foreach { slst: List[Something] =>
Jdbi.insertSomethings(slst)
}https://stackoverflow.com/questions/33320146
复制相似问题