我正在分析50000条记录,其中包含他们的标题和网页上的网址。在解析时,我将它们写入数据库,即PostgreSQL。我使用docker-compose部署了我的应用程序。然而,它总是无缘无故地停在某个页面上。我试着写一些日志来弄清楚发生了什么,但没有连接错误或类似的东西。
下面是我解析和写入数据库的代码:
object App {
val db = Database.forURL("jdbc:postgresql://db:5432/toloka?user=user&password=password")
val browser = JsoupBrowser()
val catRepo = new CategoryRepo(db)
val torrentRepo = new TorrentRepo(db)
val torrentForParseRepo = new TorrentForParseRepo(db)
val parallelismFactor = 10
val groupFactor = 10
implicit val system = ActorSystem("TolokaParser")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
def parseAndWriteTorrentsForParseToDb(doc: App.browser.DocumentType) = {
Source(getRecordsLists(doc))
.grouped(groupFactor)
.mapAsync(parallelismFactor) { torrentForParse: Seq[TorrentForParse] =>
torrentForParseRepo.createInBatch(torrentForParse)
}
.runWith(Sink.ignore)
}
def getRecordsLists(doc: App.browser.DocumentType) = {
val pages = generatePagesFromHomePage(doc)
println("torrent links generated")
println(pages.size)
val result = for {
page <- pages
} yield {
println(s"Parsing torrent list...$page")
val tmp = getTitlesAndLinksTuple(getTitlesList(browser.get(page)), getLinksList(browser.get(page)))
println(tmp.size)
tmp
}
println("torrent links and names tupled")
result flatten
}
}这些问题的原因可能是什么?
发布于 2018-02-14 06:11:53
设置监控策略,避免错误情况下的流终结。例如:
val decider: Supervision.Decider = {
case _ => Supervision.Resume
}
def parseAndWriteTorrentsForParseToDb = {
Source.fromIterator(() => List(1,2,3).toIterator)
.grouped(1)
.mapAsync(1) { torrentForParse: Seq[Int] =>
Future { 0 }
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.ignore)
}流不应使用此异步阶段配置停止
https://stackoverflow.com/questions/48768942
复制相似问题