我在实践中阅读Java并发时,遇到了以下代码片段。清单12.5 https://jcip.net/listings/PutTakeTest.java
// Listing 12.5. Producer-consumer test program for BoundedBuffer.
package net.jcip.examples;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import junit.framework.TestCase;
/**
* PutTakeTest
* <p/>
* Producer-consumer test program for BoundedBuffer
*
* @author Brian Goetz and Tim Peierls
*/
public class PutTakeTest extends TestCase {
protected static final ExecutorService pool = Executors.newCachedThreadPool();
protected CyclicBarrier barrier;
protected final SemaphoreBoundedBuffer<Integer> bb;
protected final int nTrials, nPairs;
protected final AtomicInteger putSum = new AtomicInteger(0);
protected final AtomicInteger takeSum = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
new PutTakeTest(10, 10, 100000).test(); // sample parameters
pool.shutdown();
}
public PutTakeTest(int capacity, int npairs, int ntrials) {
this.bb = new SemaphoreBoundedBuffer<Integer>(capacity);
this.nTrials = ntrials;
this.nPairs = npairs;
this.barrier = new CyclicBarrier(npairs * 2 + 1);
}
void test() {
try {
for (int i = 0; i < nPairs; i++) {
pool.execute(new Producer());
pool.execute(new Consumer());
}
barrier.await(); // wait for all threads to be ready
barrier.await(); // wait for all threads to finish
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static int xorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}
class Producer implements Runnable {
public void run() {
try {
int seed = (this.hashCode() ^ (int) System.nanoTime());
int sum = 0;
barrier.await();
for (int i = nTrials; i > 0; --i) {
bb.put(seed);
sum += seed;
seed = xorShift(seed);
}
putSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
class Consumer implements Runnable {
public void run() {
try {
barrier.await();
int sum = 0;
for (int i = nTrials; i > 0; --i) {
sum += bb.take();
}
takeSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}我发现很难理解如何在主线程或runnable线程中第二次调用循环屏障。根据我的理解,循环屏障将阻塞线程,直到在所有线程上调用await,并且屏障计数与传递给构造函数的值匹配。当第一次在线程上等待屏障时,循环屏障中的等待计数将是(n对*2+ 1)所需值的一半。控件如何在生产者和消费者中执行put sum和take sum计算,并在主线程上连续执行?
如果这个问题听起来很天真,请提前道歉。
发布于 2020-09-28 08:54:40
主线程在屏障上调用await之前启动npairs生产者和npairs消费者。每个if生产者和消费者调用await,因此它与主线程一起允许所有线程通过屏障。
https://stackoverflow.com/questions/64080960
复制相似问题