首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用SheduledExecutor blockingQueue触发blockingQueue

用SheduledExecutor blockingQueue触发blockingQueue
EN

Stack Overflow用户
提问于 2016-08-21 15:34:15
回答 2查看 1.2K关注 0票数 0

我目前正在开发java应用程序,该应用程序有多个生产者将任务添加到队列中的场景,并且在队列不是空的情况下,应该以预定义的速度执行任务。(使用多个线程来保持执行率)在执行可用任务后,执行器必须等待队列中的任务再次可用。

我知道blockingQueue可以用来触发这里的部分和ScheduledExecutorService,以固定的速度执行任务。但为了我的需要,我找不到把这两者联系起来的方法。所以,如果你能给我任何建议来实现这一点,我将非常感激。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-04-15 10:16:02

这就是我可以想出的解决办法。它看上去有点生疏,但我已经测试过了,代码正在工作。

代码语言:javascript
复制
package test;

import java.util.concurrent.*;

public class FixedRateConsumer {

private BlockingQueue<String> queue = new ArrayBlockingQueue<>(20);

private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5);

private boolean continueRunning = true;

public void executeInBackGraound() throws InterruptedException, ExecutionException {
    while (continueRunning) {
        String s = queue.take();
        Worker w = new Worker(s);
        ScheduledFuture future = executorService.scheduleAtFixedRate(w, 0, 1, TimeUnit.SECONDS);
        w.future = future;

        try {
            if (!future.isDone()) {
                future.get();
            }
        } catch (CancellationException e) {
            // Skipping
        }
    }
}

public void setContinueRunning(boolean state) {
    continueRunning = state;
}

public void addConsumableObject(String s) throws InterruptedException {
    queue.put(s);
}

private void consumeString(String s) {
    System.out.println("Consumed -> " + s + ", ... @ -> "  + System.currentTimeMillis() + " ms");
}

private class Worker implements Runnable {
    String consumableObject;
    ScheduledFuture future;

    public Worker(String initialConsumableObject) {
        this.consumableObject = initialConsumableObject;
    }

    @Override
    public void run() {
        try {
            if (consumableObject == null) {
                consumableObject = queue.take();
            }

            consumeString(consumableObject);

            consumableObject = null;
            if (queue.isEmpty()) {
                if (future == null) {
                    while (future == null) {
                        Thread.sleep(50);
                    }
                }

                future.cancel(false);
            }

        } catch (Exception e) {
            System.out.println("Exception : " + e);
        }
    }
}
}
票数 0
EN

Stack Overflow用户

发布于 2016-08-21 17:59:50

您需要任务队列可以由生产者和使用者线程访问。我编写了一个基本程序来演示这一点,但我将允许您根据需要使用BlockingQueue API和ScheduledExecutor

代码语言:javascript
复制
import java.util.concurrent.*;


public class ProducerConsumer {
    private static final BlockingQueue<Integer> taskQueue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        ExecutorService consumers = Executors.newFixedThreadPool(3);
        consumers.submit(new Consumer());
        consumers.submit(new Consumer());
        consumers.submit(new Consumer());

        ExecutorService producers = Executors.newFixedThreadPool(2);
        producers.submit(new Producer(1));
        producers.submit(new Producer(2));
    }

    private static class Producer implements Runnable {
        private final int task;

        Producer(int task) {
            this.task = task;
        }

        @Override
        public void run() {
            System.out.println("Adding task: " + task);
            taskQueue.add(task); // put is better, since it will block if queue is full
        }
    }

    private static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                Integer task = taskQueue.take(); // block if there is no task available
                System.out.println("Executing task: " + task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39065918

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档