首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >AtomicBoolean未更新

AtomicBoolean未更新
EN

Stack Overflow用户
提问于 2018-04-09 02:57:59
回答 1查看 433关注 0票数 0

我正在实施一个标准的生产者-消费者计划,这样生产者停止生产200产品后。为了表明这一点,生产者将-1放入BlockingQueue变量,否则该变量总是包含正整数。我的消费者实现如下:

代码语言:javascript
复制
public class Consumer implements Runnable{
private BlockingQueue<Integer> queue;
private AtomicBoolean isProducerClosed = new AtomicBoolean(false);

public Consumer(BlockingQueue<Integer> queue) {
    this.queue = queue;
}

@Override
public void run() {

    try {
        Thread.sleep(50);

        while(!isProducerClosed.get()) {
            try {
                Integer value = queue.take();

                if ((value.intValue() == -1)){
                    System.out.println(Thread.currentThread().getName() + " Encountered -1. Signal to shutdown consumers.");
                    //isProducerClosed.set(true);
                    isProducerClosed.compareAndSet(false, true);
                    break;
                }

                System.out.println(Thread.currentThread().getName() + " consuming : " + value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }


}

}

备用消费者逻辑(仍然存在相同的问题):

代码语言:javascript
复制
@Override
public void run() {

    while(true) {
        try {
            Thread.sleep(50);
            Integer value = null;

            synchronized (this) {
                if(!isProducerClosed.get()) {
                    value = queue.take();
                    if ((value.intValue() == -1)) {
                        System.out.println(Thread.currentThread().getName() + " Encountered -1. Signal to shutdown consumers.");
                        isProducerClosed.set(true);
                        isProducerClosed = isProducerClosed;
                        System.out.println("Current value of isProducerClosed : " + isProducerClosed.get());
                    }
                }
            }

            if (!isProducerClosed.get()) {
                System.out.println(Thread.currentThread().getName() + " consuming : " + value);
            } else {
                break;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

在执行此操作时,我的使用者线程会卡在queue.take()上,就好像它们在等待队列中的产品可用一样。一种可能是:所有使用者线程同时检查条件isProducerClosed.get(),进入while循环,并访问queue.take()方法。这是正确的假设吗?如果是,是否有任何方法可以不使用低级别synchronized关键字来实现这一点?我对volatile boolean变量做了同样的尝试,结果完全一样。只有在使该变量保持静态之后,我才能够看到所有消费者在队列中遇到-1后被终止(因为var现在是类的)。

我的呼叫代码:

代码语言:javascript
复制
public class ProducerConsumer {

private BlockingQueue<Integer> sharedQueue = new ArrayBlockingQueue<Integer>(10);


public void executeProducerConsumer(int producerCount, int consumerCount){
    ExecutorService executorService = Executors.newFixedThreadPool(3, Executors.defaultThreadFactory());

    for(int i = 0; i < producerCount; i++) {
        executorService.submit(new Producer(sharedQueue));      //producer
    }

    for(int i = 0; i < consumerCount; i++) {
        executorService.submit(new Consumer(sharedQueue));      //i-th consumer.
    }


    //initiates clossure of threads after completion, in async manner.
    executorService.shutdown();

    //wait till all threads are done.
    try {
        executorService.awaitTermination(1, TimeUnit.HOURS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("The End.");
}

}

生产者代码:

代码语言:javascript
复制
public class Producer implements Runnable {
private BlockingQueue<Integer> queue;
private volatile int maxNumberOfItemsToProduce = 10;

public Producer(BlockingQueue<Integer> queue) {
    this.queue = queue;
}

@Override
public void run() {
    Random random = new Random();

    while(true){

        if(maxNumberOfItemsToProduce == 0) {
            try {
                queue.put(-1);
                System.out.println("Terminating Producer after producing max number of products.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            break;
        }

        try {
            queue.put(random.nextInt(300));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        maxNumberOfItemsToProduce--;
    }

}

}

控制台截图:

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-04-09 03:40:14

一种可能是:所有使用者线程同时检查条件isProducerClosed.get(),进入while循环,并访问queue.take()方法。这是正确的假设吗?如果是,是否有任何方法实现这一点而不使用低级别同步关键字?我用易失性布尔变量做了同样的尝试,结果完全一样。只有在使该变量保持静态之后,我才能够看到所有消费者在队列中遇到-1后被终止(因为var现在是类的)。

是的,这个假设是正确的。

第一个问题是:isProducerClosed不是在消费者之间共享的。它必须在消费者之间共享,这样如果一个消费者设定了它的价值,其他消费者也可以看到它的价值。使其成为static使其共享,因此情况在此之后有所改善。

第二个问题:即使在isProducerClosed被共享之后,您也可能陷入这样一种情况:多个使用者将在一个空队列上执行queue.take() (一个线程可能取最后一个值,而另一个线程在第一个线程将isProducerClosed设置为true之前执行take() )。您将需要同步(例如,使用双重检查)。

示例代码(在消费者以外的部分中仍然包含bug/种族)-

代码语言:javascript
复制
public class TestClass {

    private BlockingQueue<Integer> sharedQueue = new ArrayBlockingQueue<Integer>(10);

    public static void main(String[] args) {
        TestClass t = new TestClass();
        t.executeProducerConsumer(3, 3);
    }

    public void executeProducerConsumer(int producerCount, int consumerCount) {
        ExecutorService executorService = Executors.newFixedThreadPool(producerCount + consumerCount, Executors.defaultThreadFactory());

        for (int i = 0; i < producerCount; i++) {
            executorService.submit(new Producer(sharedQueue));      //producer
        }

        for (int i = 0; i < consumerCount; i++) {
            executorService.submit(new Consumer(sharedQueue));      //i-th consumer.
        }

        //initiates clossure of threads after completion, in async manner.
        executorService.shutdown();

        //wait till all threads are done.
        try {
            executorService.awaitTermination(1, TimeUnit.HOURS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("The End.");
    }

}

class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;
    private static volatile boolean isProducerClosed = false; // make this static so that it is shared across consumers

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(50);
            Integer value;

            while (!isProducerClosed) {
                try {
                    synchronized (queue) { //synchronize so that only one thread can talk to the queue at a time
                        if (!isProducerClosed) { //double check
                            value = queue.take(); // we can now safely take an item
                            if ((value.intValue() == -1)) {
                                isProducerClosed = true;
                                System.out.println(Thread.currentThread().getName() + " Encountered -1. Signal to shutdown consumers.");
                                break;
                            }
                        } else {
                            System.out.println(Thread.currentThread().getName() + " Last item was taken by some other consumer. Exiting!");
                            break;
                        }
                    }
                    consumeValue(value); //Consume the value outside the lock
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

    private void consumeValue(Integer value) {
        System.out.println(Thread.currentThread().getName() + " Consuming value :" + value);
    }
}

class Producer implements Runnable {

    private BlockingQueue<Integer> queue;
    private static volatile int maxNumberOfItemsToProduce = 10;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        Random random = new Random();

        while (true) {

            if (maxNumberOfItemsToProduce == 0) {
                try {
                    queue.put(-1);
                    System.out.println("Terminating Producer after producing max number of products.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                break;
            }

            try {
                queue.put(random.nextInt(300));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            maxNumberOfItemsToProduce--;
        }

    }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49724948

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档