首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >生产者/消费者模型中的文件读取

生产者/消费者模型中的文件读取
EN

Stack Overflow用户
提问于 2018-05-02 19:33:42
回答 1查看 420关注 0票数 0

我试图从文件中读取字符串,使用该字符串执行HTTP请求,如果请求返回200,则使用该请求执行另一个HTTP请求。

我认为这方面的一个好模式应该是生产者消费模式,但出于某种原因,我完全被困住了。由于某种原因,整个过程只是在某一时刻停止,我不知道为什么。

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException, IOException {

    ArrayBlockingQueue<String> subQueue = new ArrayBlockingQueue<>(3000000);

    ThreadPoolExecutor consumers = new ThreadPoolExecutor(100, 100, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
    ThreadPoolExecutor producers = new ThreadPoolExecutor(100, 100, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10000000));
    consumers.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    String fileName = "test";
    try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
        String line;
        while ((line = br.readLine()) != null) {
            String address = new JSONObject(line).getString("Address");
            producers.submit(new Thread(() -> {
                if (requestReturn200(address)) {
                    try {
                        subQueue.put(address);
                    } catch (InterruptedException e) {
                        System.out.println("Error producing.");
                    }
                }
            }));
        }
        producers.shutdown();
    }

    while (subQueue.size() != 0 || !producers.isShutdown()) {
        String address = subQueue.poll(1, TimeUnit.SECONDS);
        if (address != null) {
            consumers.submit(new Thread(() -> {
                try {
                    System.out.println("Doing..." + address);
                    doOtherHTTPReqeust(address);
                } catch (Exception e) {
                    System.out.println("Fatal error consuming);
                }
            }));

        } else {
            System.out.println("Null");
        }
    }

    consumers.shutdown();
}

任何和所有的帮助都将不胜感激。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-05-02 21:26:20

代码语言:javascript
复制
 while (subQueue.size() != 0 || !producers.isShutdown()) {

首先,!producers.isShutdown()将始终返回!true,因为它是在producers.shutdown()之后检查的。isShutdown没有说明池中的任务是否仍在运行,而是如果池已经关闭,导致无法接受新任务。在您的情况下,这将始终是false

其次,虽然您的消费者创建循环和使用者从队列中获取的数据比生产者所能提供的要快得多,但在“生产”过程中,消费者可能已经清除了导致条件subQueue.size() != 0错误的原因。正如您所知,这将打破循环和比特的制作者。

您应该停止使用queue.size(),而应该使用BlockingQueue的阻塞属性。queue.take()将阻塞,直到新元素可用为止。

所以整个流程应该是这样的。

  1. 开始一些生产者任务池,就像你现在正在做的那样。
  2. 让生产者把数据放在阻塞队列中-是的,你在这里
  3. 开始一些(我可以说是固定的)消费者数量
  4. 让消费者从队列中获取queue.take()数据。这将迫使消费者对新数据进行“自动测试”,并在新数据可用时加以利用。

我将不提创建200个线程是疯狂的,并且忽略了多线程使用者/生产者/任务池的全部目的,至少在您的例子中是IMHO。这样做的想法是使用少量的线程,因为它们是重量级的,可以执行大量排队的任务。但这是不同时间的讨论。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50142239

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档