首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >具有最小和最大阈值设置的池填充服务

具有最小和最大阈值设置的池填充服务
EN

Stack Overflow用户
提问于 2015-09-02 16:12:53
回答 3查看 200关注 0票数 2

我想要做的应用程序有一个池(无限)的一些对象。当池的大小达到最低阈值时,我希望启动池填充服务并生成一定数量的新对象(直到达到最大阈值)。

这是producer-consumer问题的稍微修改过的版本,但不知怎么的,我被卡住了。由于创建大小有限的BlockingQueue并保持其填充很容易,我不知道如何解决我的问题。

我尝试使用ReentrantLock,它是Condition对象,但是Condition#signal()方法要求我在锁中,这在我的情况下是完全不必要的。

在我看来,最好的解决方案应该是类似于CountDownLatch的东西。消费者将减少计数器,并最终触发池填充服务。这里的问题是CountDownLatch无法重新启动自己。

有什么想法吗?

换句话说:我有一桶消费者线程,其中一条是生产者。生产者应该等待,直到达到最小阈值,产生一些对象,然后再等待。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2015-09-03 00:50:55

Semaphore可以作为生产者的障碍,并且可以重复使用.当SemaphoreAtomicBoolean相结合时,生产者可以在不影响消费者的情况下工作。它确实需要池来处理填充逻辑。

在下面的实现中,生产者立即开始填充池,然后等待池达到最小大小。

代码语言:javascript
复制
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

// http://stackoverflow.com/q/32358084/3080094
public class RandomWell {

    public static void main(String[] args) {

        try {

            final FilledPool<Integer> pool = new FilledPool<Integer>(100, 1000);
            final CountDownLatch syncStart = new CountDownLatch(3);

            Thread consumer = new Thread() {
                @Override public void run() {
                    // just to do something, keep track of amount of positive ints from pool
                    int positiveInt = 0;
                    int totalInts = 0;
                    try {
                        syncStart.countDown();
                        syncStart.await();
                        for(;;) {
                            int i = pool.take();
                            if (i > 0) {
                                positiveInt++;
                            }
                            totalInts++;
                            Thread.yield();
                        }
                    } catch (InterruptedException e) {
                        System.out.println("Consumer stopped: " + positiveInt + " / " + (totalInts - positiveInt));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            consumer.start();

            Thread producer = new Thread() {
                @Override public void run() {
                    try {
                        Random r = new Random();
                        syncStart.countDown();
                        syncStart.await();
                        for(;;) {
                            int fillTotal = 0;
                            while (!pool.isMinFilled()) {
                                int fill = pool.getFillSize();
                                for (int i = 0; i < fill; i++) {
                                    pool.offer(r.nextInt());
                                }
                                fillTotal += fill;
                                // System.out.println("Pool size: " + pool.sizeFast());
                            }
                            System.out.println("Filled " + fillTotal);
                            pool.awaitNewFilling();
                        }
                    } catch (InterruptedException e) {
                        System.out.println("Producer stopped.");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            producer.start();
            syncStart.countDown();
            syncStart.await();

            Thread.sleep(100);

            producer.interrupt();
            consumer.interrupt();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static class FilledPool<E> {

        private final LinkedBlockingQueue<E> pool;
        private final int minSize;
        private final int maxSize;
        private final Semaphore needFilling = new Semaphore(0);

        // producer starts filling initially
        private final AtomicBoolean filling = new AtomicBoolean(true);

        public FilledPool(int minSize, int maxSize) {
            super();
            this.minSize = minSize;
            this.maxSize = maxSize;
            pool = new LinkedBlockingQueue<E>();
        }

        public E take() throws InterruptedException {

            triggerFilling();
            E e = pool.take();
            return e;
        }

        private void triggerFilling() {

            if (!isFilling() && !isMinFilled() && filling.compareAndSet(false, true)) {
                needFilling.release();
                System.out.println("Filling triggered.");
            }
        }

        public void offer(E e) { pool.offer(e); }

        public void awaitNewFilling() throws InterruptedException {

            // must check on minimum in case consumers outpace producer
            if (isMinFilled()) {
                filling.set(false);
                needFilling.acquire();
            }
        }

        public int size() { return pool.size(); }
        public boolean isMinFilled() { return minSize < size(); }
        public int getFillSize() { return maxSize - size(); } 
        public boolean isFilling() { return filling.get(); }
    }
}

更新:我还成功地使用了ConcurrentLinkedQueue,而不是LinkedBlockingQueue,后者使吞吐量增加了一倍,但同时也使代码的复杂性增加了一倍。

票数 1
EN

Stack Overflow用户

发布于 2015-09-02 16:35:27

这里有个方法。它使用ReentrantLock.tryLock()来确保只填充单个线程,并且只在低于阈值的情况下这样做。

这里发布的示例代码将运行30秒,然后停止。

代码语言:javascript
复制
package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Filler {

    public static void main(String... args) throws InterruptedException {

        final LinkedBlockingDeque<Object> linkedBlockingDeque = new LinkedBlockingDeque<>();
        final ExecutorService executorService = Executors.newFixedThreadPool(4);
        final Lock fillLock = new ReentrantLock();
        final AtomicBoolean stop = new AtomicBoolean(false);

        for (int i = 0; i < 4; i++) {
            executorService.execute(new Worker(linkedBlockingDeque, fillLock, stop));
        }

        Thread.sleep(TimeUnit.SECONDS.toMillis(30));
        stop.set(true);
        executorService.shutdown();
        executorService.awaitTermination(30, TimeUnit.SECONDS);

    }

}

class Worker implements Runnable {

    private final LinkedBlockingDeque<Object> linkedBlockingDeque;
    private final Lock fillLock;
    private final AtomicBoolean stop;

    Worker(LinkedBlockingDeque<Object> linkedBlockingDeque, Lock fillLock, AtomicBoolean stop) {

        this.linkedBlockingDeque = linkedBlockingDeque;
        this.fillLock = fillLock;
        this.stop = stop;
    }

    @Override
    public void run() {

        try {
            while (!stop.get()) {
                Object o = linkedBlockingDeque.poll(1, TimeUnit.SECONDS);
                if (o != null) {
                    handle(o);
                }
                if (linkedBlockingDeque.size() < 10) {
                    tryFill();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    protected void tryFill() {

        if (fillLock.tryLock()) {
            try {
                System.out.println("Filling");
                for (int i = 0; i < 100; i++) {
                    linkedBlockingDeque.add(new Object());
                }

            } finally {
                fillLock.unlock();
            }
        }
    }

    protected void handle(Object object) {

        System.out.printf("object: %s\n", object);
        //TODO: blah blah blah stuff

    }

}

如果您希望其他线程在队列填充时阻塞,请让它们按顺序等待锁,如果无法将其填满。

例如,将tryFill()改为如下所示:

受保护的空tryFill() {

代码语言:javascript
复制
if (fillLock.tryLock()) {
    try {
        System.out.println("Filling");
        for (int i = 0; i < 100; i++) {
            linkedBlockingDeque.add(new Object());
        }

    } finally {
        fillLock.unlock();
    }
} else {
    fillLock.lock();
    try {

    } finally {
        fillLock.unlock();
    }
}

}

或者,你可以用一个条件来做这个--我会把它作为一个练习留给操作。

票数 0
EN

Stack Overflow用户

发布于 2015-09-02 21:21:04

我认为可以使用另一个BlockingQueue来向制片人发出信号,是时候唤醒并填充池了。如下所示:

代码语言:javascript
复制
package com.stackoverflow;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class Main {

    public static final int LOWER_BOUND = 42;
    public static final int UPPER_BOUND = 84;
    public static final int CONSUMERS_COUNT = 10;

    public static void main(String[] args) {

        BlockingQueue<Object> pool = new LinkedBlockingQueue<>();
        AtomicInteger currentPoolSize = new AtomicInteger(0);
        BlockingQueue<Object> commandsForProducer = new LinkedBlockingQueue<>();

        Thread producer = new Thread(new Producer(pool, currentPoolSize, commandsForProducer));
        producer.start();
        for (int i = 0; i < CONSUMERS_COUNT; i++) {
            Thread consumer = new Thread(new Consumer(pool, currentPoolSize, commandsForProducer));
            consumer.start();
        }
    }

}

class Producer implements Runnable {

    private BlockingQueue<Object> pool;
    private AtomicInteger currentPoolSize;
    private BlockingQueue<Object> commandsForProducer;

    Producer(BlockingQueue<Object> pool, AtomicInteger currentPoolSize, BlockingQueue<Object> commandsForProducer) {
        this.pool = pool;
        this.currentPoolSize = currentPoolSize;
        this.commandsForProducer = commandsForProducer;
    }

    @Override
    public void run() {
        while (true) {
            if (currentPoolSize.get() < Main.UPPER_BOUND){
                pool.add(new Object());
                System.out.println(Thread.currentThread().getName() + " producer, items in pool:" +
                        currentPoolSize.incrementAndGet());
                try {
                    Thread.sleep(4); // Simulating work
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                try {
                    System.out.println(Thread.currentThread().getName() + " producer is trying to sleep");
                    commandsForProducer.take();
                    System.out.println(Thread.currentThread().getName() + " producer awakes");
                    commandsForProducer.clear();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

class Consumer implements Runnable {

    private BlockingQueue<Object> pool;
    private AtomicInteger currentPoolSize;
    private BlockingQueue<Object> commandsForProducer;

    Consumer(BlockingQueue<Object> pool, AtomicInteger currentPoolSize, BlockingQueue<Object> commandsForProducer) {
        this.pool = pool;
        this.currentPoolSize = currentPoolSize;
        this.commandsForProducer = commandsForProducer;
    }

    @Override
    public void run() {
        while (true) {
            if (currentPoolSize.get() <= Main.LOWER_BOUND) {
                System.out.println(Thread.currentThread().getName() + " signaled to producer");
                commandsForProducer.add(new Object());
            }
            try {
                pool.take();
                System.out.println(Thread.currentThread().getName() + " consumer, items in pool:" +
                        currentPoolSize.decrementAndGet());
                Thread.sleep(50); // Simulating work
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32358084

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档