首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何重写下面的rx-java爬虫

如何重写下面的rx-java爬虫
EN

Stack Overflow用户
提问于 2016-12-07 14:52:42
回答 1查看 197关注 0票数 0

爬虫有一个记录要爬行的url的urlQueue,一个模拟的异步url抓取器。

我试着用rx-java的风格来写它。首先,我像这样尝试Flowable.generate

代码语言:javascript
复制
    Flowable.generate((Consumer<Emitter<Integer>>) e -> {
        final Integer poll = demo.urlQueue.poll();
        if (poll != null) {
            e.onNext(poll);
        } else if (runningCount.get() == 0) {
            e.onComplete();
        }
    }).flatMap(i -> {
        runningCount.incrementAndGet();
        return demo.urlFetcher.asyncFetchUrl(i);
    }, 10)
            .doOnNext(page -> demo.onSuccess(page))
            .subscribe(page -> runningCount.decrementAndGet());

但是它不会工作,因为在开始时,urlQueue中可能只有一个种子,所以generate被调用了10次,但只发出了一个e.onNext。只有当它完成时,才会调用下一个请求(1)-> generate。

虽然在代码中,我们将flatMap maxConcurrency指定为10,但它将逐个爬行。

在那之后,我像下面这样修改代码,它可以像预期的那样工作。

但在代码中,我应该关心当前有多少任务正在运行,然后计算应该从队列中提取多少任务,我认为rx-java应该完成这项工作。

我不确定代码是否可以用一种更简单的方式重写。

代码语言:javascript
复制
public class CrawlerDemo {
    private static Logger logger = LoggerFactory.getLogger(CrawlerDemo.class);

    // it can be redis queue or other queue
    private BlockingQueue<Integer> urlQueue = new LinkedBlockingQueue<>();

    private static AtomicInteger runningCount = new AtomicInteger(0);

    private static final int MAX_CONCURRENCY = 5;

    private UrlFetcher urlFetcher = new UrlFetcher();

    private void addSeed(int i) {
        urlQueue.offer(i);
    }

    private void onSuccess(Page page) {
        page.links.forEach(i -> {
            logger.info("offer more url " + i);
            urlQueue.offer(i);
        });
    }

    private void start(BehaviorProcessor processor) {
        final Integer poll = urlQueue.poll();
        if (poll != null) {
            processor.onNext(poll);

        } else {
            processor.onComplete();
        }
    }

    private int dispatchMoreLink(BehaviorProcessor processor) {

        int links = 0;
        while (runningCount.get() <= MAX_CONCURRENCY) {
            final Integer poll = urlQueue.poll();
            if (poll != null) {
                processor.onNext(poll);

                links++;
            } else {
                if (runningCount.get() == 0) {
                    processor.onComplete();
                }
                break;
            }
        }

        return links;
    }

    private Flowable<Page> asyncFetchUrl(int i) {
        return urlFetcher.asyncFetchUrl(i);
    }


    public static void main(String[] args) throws InterruptedException {
        CrawlerDemo demo = new CrawlerDemo();
        demo.addSeed(1);

        BehaviorProcessor<Integer> processor = BehaviorProcessor.create();

        processor
                .flatMap(i -> {
                    runningCount.incrementAndGet();
                    return demo.asyncFetchUrl(i)
                            .doFinally(() -> runningCount.decrementAndGet())
                            .doFinally(() -> demo.dispatchMoreLink(processor));
                }, MAX_CONCURRENCY)
                .doOnNext(page -> demo.onSuccess(page))
                .subscribe();

        demo.start(processor);


    }


}

class Page {
    public List<Integer> links = new ArrayList<>();
}

class UrlFetcher {
    static Logger logger = LoggerFactory.getLogger(UrlFetcher.class);


    final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    public Flowable<Page> asyncFetchUrl(Integer url) {

        logger.info("start async get " + url);
        return Flowable.defer(() -> emitter ->
                scheduledExecutorService.schedule(() -> {

                    Page page = new Page();
                    // the website urls no more than 1000
                    if (url < 1000) {
                        page.links = IntStream.range(1, 5).boxed().map(j -> 10 * url + j).collect(Collectors.toList());
                    }

                    logger.info("finish async get " + url);
                    emitter.onNext(page);
                    emitter.onComplete();
                }, 5, TimeUnit.SECONDS));                                 // cost 5 seconds to access url
    }
}
EN

回答 1

Stack Overflow用户

发布于 2017-09-01 03:04:27

您正在尝试将常规(非Rx)代码与RxJava一起使用,但未获得您想要的结果。

首先要做的是将urlQueue.poll()转换为Flowable<Integer>

代码语言:javascript
复制
Flowable.generate((Consumer<Emitter<Integer>>) e -> {
    final Integer take = demo.urlQueue.take(); // Note 1
    e.onNext(take); // Note 2
})
    .observeOn(Schedulers.io(), 1) // Note 3
    .flatMap(i -> demo.urlFetcher.asyncFetchUrl(i), 10)
    .subscribe(page -> demo.onSuccess(page));

  1. 以被动的方式读取队列意味着阻塞等待。尝试poll()队列增加了一层复杂性,RxJava允许您跳过这一层。
  2. 将接收到的值传递给任何订阅者。如果需要指示完成,则需要添加外部布尔值,或使用带内指示符(例如负integer).
  3. observeOn()操作符将订阅生成器。值1将只导致一个订阅,因为拥有多个订阅是没有意义的。

代码的其余部分与您所拥有的代码类似。之所以会出现问题,是因为flatMap(...,10)操作将订阅生成器10次,而这并不是您想要的。您希望限制同时获取的数量。为了防止过早退出生成器,添加runningCount是一件繁琐的事情,但它不能替代在urlQueue上发出数据结束信号的正确方法。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41011150

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档