首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Akka Streams mapAsync停止解析

使用Akka Streams mapAsync停止解析
EN

Stack Overflow用户
提问于 2018-02-13 22:20:15
回答 1查看 115关注 0票数 0

我正在分析50000条记录,其中包含他们的标题和网页上的网址。在解析时,我将它们写入数据库,即PostgreSQL。我使用docker-compose部署了我的应用程序。然而,它总是无缘无故地停在某个页面上。我试着写一些日志来弄清楚发生了什么,但没有连接错误或类似的东西。

下面是我解析和写入数据库的代码:

代码语言:javascript
复制
object App {
  val db = Database.forURL("jdbc:postgresql://db:5432/toloka?user=user&password=password")
  val browser = JsoupBrowser()
  val catRepo = new CategoryRepo(db)
  val torrentRepo = new TorrentRepo(db)
  val torrentForParseRepo = new TorrentForParseRepo(db)
  val parallelismFactor = 10
  val groupFactor = 10
  implicit val system = ActorSystem("TolokaParser")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

def parseAndWriteTorrentsForParseToDb(doc: App.browser.DocumentType) = {
    Source(getRecordsLists(doc))
      .grouped(groupFactor)
      .mapAsync(parallelismFactor) { torrentForParse: Seq[TorrentForParse] =>
        torrentForParseRepo.createInBatch(torrentForParse)
      }
      .runWith(Sink.ignore)
  }

 def getRecordsLists(doc: App.browser.DocumentType) = {
    val pages = generatePagesFromHomePage(doc)
    println("torrent links generated")
    println(pages.size)
    val result = for {
      page <- pages
    } yield {
      println(s"Parsing torrent list...$page")
      val tmp = getTitlesAndLinksTuple(getTitlesList(browser.get(page)), getLinksList(browser.get(page)))
      println(tmp.size)
      tmp
    }
    println("torrent links and names tupled")
    result flatten
  }

}

这些问题的原因可能是什么?

EN

回答 1

Stack Overflow用户

发布于 2018-02-14 06:11:53

设置监控策略,避免错误情况下的流终结。例如:

代码语言:javascript
复制
val decider: Supervision.Decider = {
  case _ => Supervision.Resume
}

def parseAndWriteTorrentsForParseToDb = {
  Source.fromIterator(() => List(1,2,3).toIterator)
    .grouped(1)
    .mapAsync(1) { torrentForParse: Seq[Int] =>
      Future { 0 }
    }
    .withAttributes(ActorAttributes.supervisionStrategy(decider))
    .runWith(Sink.ignore)
}

流不应使用此异步阶段配置停止

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48768942

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档