爬虫有一个记录要爬行的url的urlQueue,一个模拟的异步url抓取器。
我试着用rx-java的风格来写它。首先,我像这样尝试Flowable.generate
Flowable.generate((Consumer<Emitter<Integer>>) e -> {
final Integer poll = demo.urlQueue.poll();
if (poll != null) {
e.onNext(poll);
} else if (runningCount.get() == 0) {
e.onComplete();
}
}).flatMap(i -> {
runningCount.incrementAndGet();
return demo.urlFetcher.asyncFetchUrl(i);
}, 10)
.doOnNext(page -> demo.onSuccess(page))
.subscribe(page -> runningCount.decrementAndGet());但是它不会工作,因为在开始时,urlQueue中可能只有一个种子,所以generate被调用了10次,但只发出了一个e.onNext。只有当它完成时,才会调用下一个请求(1)-> generate。
虽然在代码中,我们将flatMap maxConcurrency指定为10,但它将逐个爬行。
在那之后,我像下面这样修改代码,它可以像预期的那样工作。
但在代码中,我应该关心当前有多少任务正在运行,然后计算应该从队列中提取多少任务,我认为rx-java应该完成这项工作。
我不确定代码是否可以用一种更简单的方式重写。
public class CrawlerDemo {
private static Logger logger = LoggerFactory.getLogger(CrawlerDemo.class);
// it can be redis queue or other queue
private BlockingQueue<Integer> urlQueue = new LinkedBlockingQueue<>();
private static AtomicInteger runningCount = new AtomicInteger(0);
private static final int MAX_CONCURRENCY = 5;
private UrlFetcher urlFetcher = new UrlFetcher();
private void addSeed(int i) {
urlQueue.offer(i);
}
private void onSuccess(Page page) {
page.links.forEach(i -> {
logger.info("offer more url " + i);
urlQueue.offer(i);
});
}
private void start(BehaviorProcessor processor) {
final Integer poll = urlQueue.poll();
if (poll != null) {
processor.onNext(poll);
} else {
processor.onComplete();
}
}
private int dispatchMoreLink(BehaviorProcessor processor) {
int links = 0;
while (runningCount.get() <= MAX_CONCURRENCY) {
final Integer poll = urlQueue.poll();
if (poll != null) {
processor.onNext(poll);
links++;
} else {
if (runningCount.get() == 0) {
processor.onComplete();
}
break;
}
}
return links;
}
private Flowable<Page> asyncFetchUrl(int i) {
return urlFetcher.asyncFetchUrl(i);
}
public static void main(String[] args) throws InterruptedException {
CrawlerDemo demo = new CrawlerDemo();
demo.addSeed(1);
BehaviorProcessor<Integer> processor = BehaviorProcessor.create();
processor
.flatMap(i -> {
runningCount.incrementAndGet();
return demo.asyncFetchUrl(i)
.doFinally(() -> runningCount.decrementAndGet())
.doFinally(() -> demo.dispatchMoreLink(processor));
}, MAX_CONCURRENCY)
.doOnNext(page -> demo.onSuccess(page))
.subscribe();
demo.start(processor);
}
}
class Page {
public List<Integer> links = new ArrayList<>();
}
class UrlFetcher {
static Logger logger = LoggerFactory.getLogger(UrlFetcher.class);
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public Flowable<Page> asyncFetchUrl(Integer url) {
logger.info("start async get " + url);
return Flowable.defer(() -> emitter ->
scheduledExecutorService.schedule(() -> {
Page page = new Page();
// the website urls no more than 1000
if (url < 1000) {
page.links = IntStream.range(1, 5).boxed().map(j -> 10 * url + j).collect(Collectors.toList());
}
logger.info("finish async get " + url);
emitter.onNext(page);
emitter.onComplete();
}, 5, TimeUnit.SECONDS)); // cost 5 seconds to access url
}
}发布于 2017-09-01 03:04:27
您正在尝试将常规(非Rx)代码与RxJava一起使用,但未获得您想要的结果。
首先要做的是将urlQueue.poll()转换为Flowable<Integer>
Flowable.generate((Consumer<Emitter<Integer>>) e -> {
final Integer take = demo.urlQueue.take(); // Note 1
e.onNext(take); // Note 2
})
.observeOn(Schedulers.io(), 1) // Note 3
.flatMap(i -> demo.urlFetcher.asyncFetchUrl(i), 10)
.subscribe(page -> demo.onSuccess(page));poll()队列增加了一层复杂性,RxJava允许您跳过这一层。observeOn()操作符将订阅生成器。值1将只导致一个订阅,因为拥有多个订阅是没有意义的。代码的其余部分与您所拥有的代码类似。之所以会出现问题,是因为flatMap(...,10)操作将订阅生成器10次,而这并不是您想要的。您希望限制同时获取的数量。为了防止过早退出生成器,添加runningCount是一件繁琐的事情,但它不能替代在urlQueue上发出数据结束信号的正确方法。
https://stackoverflow.com/questions/41011150
复制相似问题