Akka streams大大减少了我的样板代码,并包含了许多有用的功能。但是,我需要能够限制项目的处理速度。问题是,随着时间的推移(从单个在线网站下载),我正在给连接到资源源的Hazelcast队列提供下载,但进入队列的链接数量可能会变得相当大。理想情况下,一次运行的请求不超过50-60个。Akka Streams中有没有一个功能可以让我限制一次处理的项目数量?
另一个限制是在与某些网站交互时需要复杂的状态管理、代码处理和其他功能。Akka Http在这里是无能为力的。我的网络代码完全是用Jsoup和Apache Http组件编写的,偶尔会调用基于JavaFX的服务器来呈现脚本。
我目前尝试使用文档中描述的缓冲区来控制输入速率,如下所示:
val sourceGraph: Graph[SourceShape[(FlowConfig, Term)], NotUsed] = new HazelcastTermSource(conf.termQueue, conf)
val source = Source.fromGraph(sourceGraph)
val (killSwitch, last) = source
.buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
.viaMat(new DownloadFlow())(Keep.both)
.map(x => println(x))
.to(Sink.ignore).run()https://stackoverflow.com/questions/44447390
复制相似问题