我正在使用scalikejdbc访问一个很大的表。我的理解是,在映射或迭代所有行之前,它会将所有行提取到内存中。
目前我有一个使用rxscala Observable的实现,它非常简单。但是接收器比读取sql慢,而且由于缓冲的原因,我得到了OutOfMemory。这是我目前的制片人,可以观察到:
def fetchProductsAsObservable(
sql: SQL[Nothing,NoExtractor],
extractor: (WrappedResultSet) => ProductItem)
) =
Observable[ProductItem](o =>
try {
sql.foreach(row => o.onNext(extractor(row)))
o.onCompleted()
} catch {
case e: Throwable => o.onError(e)
}
)我知道SQL.foreach方法,但它得到一个回调方法并返回Unit.我的背景是.NET,我不知道如何在scala中用scalikejdbc正确地实现一个简单的迭代器,我可以用它来进行并行处理?
发布于 2017-03-29 22:38:49
您的问题与ScalikeJDBC行为无关,而是与RxJava相关。因为你的观察物是由热源提供营养的,所以你需要使用背压策略。
https://stackoverflow.com/questions/40234598
复制相似问题