我有一个非常基本的线程池代码。它调用一个包含在链接块队列中的工作对象池。代码只是通过重新循环worker对象来输出输入数据。
我发现在以下几个方面存在一致的僵局/冻结:
public class throttleheapthreadpool{
private quoteworkerobject[] channels;
private LinkedBlockingQueue<quoteworkerobject> idlechannels;
public throttleheapthreadpool(int poolsize,int stocks){
channels=new quoteworkerobject[poolsize];
idlechannels=new LinkedBlockingQueue<quoteworkerobject>();
for(int i=1;i<poolsize;i++){
channels[i]=new quoteworkerobject(idlechannels);
idlechannels.add(channels[i]);//All WORKERS to Idle pool to start
}
}
public void execute(Integer quote){
quoteworkerobject current = null;
try {
//extract worker from pool
current = (quoteworkerobject)idlechannels.take();
current.put(quote);
} catch (InterruptedException e) {
}
}
class quoteworkerobject{
LinkedBlockingQueue<Integer> taskqueue=new LinkedBlockingQueue<Integer>();
Thread quotethread=null;
LinkedBlockingQueue<quoteworkerobject> idle=null;
@SuppressWarnings("unchecked")
public quoteworkerobject(LinkedBlockingQueue<quoteworkerobject> idlechannels){
this.idle=idlechannels;
Runnable r=new Runnable(){
public void run() {
insertquote();
}
};
quotethread=new Thread(r);
quotethread.start();//spawn a thread from the worker
}
public void put(Integer quote){
taskqueue.add(quote);
}
public void insertquote(){
try{
Integer thisquote=taskqueue.take();
idle.add(this);
}
catch(Exception ex){
}
}
}
public static void main(String[] args){
throttleheapthreadpool pool=new throttleheapthreadpool(5,200);
Random randomGenerator = new Random();
for(int node=0;node < 20;node++){
int d=randomGenerator.nextInt(5*200);
pool.execute(d);
}
}
} 此代码在第8次执行时始终冻结--在当前=(商工作对象)idlechannels.take()处;
上面有什么问题吗?
发布于 2012-08-07 08:40:18
这就是我讨厌的原因?不喜欢使用这样的代码。你应该考虑让你/我们的生活变得更容易,并且写一些代码,即使过了几个月,你也可以看到并被证明:相应地命名变量,写简短的文档或解释等等。我花了25分钟重构,因为我不知道发生了什么。
我添加了一个小的重构,我还添加了一些断点,看看代码-解释在里面。但问题在于insertQuote方法--它完成得太早了。
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
public class Pool {
private Worker[] workers;
private LinkedBlockingQueue<Worker> workerQueue;
/**
* Create a pool of 5 workers and a {@link LinkedBlockingQueue} to store them
*/
public Pool(int poolsize) {
//1. First you get here : you create a Pool of 5 Worker Threads and a Queue to store them
System.out.println("1.");
workers = new Worker[poolsize];
workerQueue = new LinkedBlockingQueue<Worker>();
for (int i = 0; i < poolsize; i++) {
//2. You instantiate 5 worker Threads and place each of them on the Queue
System.out.println("2.");
workers[i] = new Worker(workerQueue);
workerQueue.add(workers[i]);
}
}
public void execute(Integer quote) {
Worker current = null;
try {
// extract worker from pool
//6. Get a worker from the Queue
System.out.println("6.");
current = workerQueue.take();
current.put(quote);
} catch (InterruptedException e) {
}
}
/**
*
*
*/
class Worker {
LinkedBlockingQueue<Integer> taskqueueForEachWorker = new LinkedBlockingQueue<Integer>();
LinkedBlockingQueue<Worker> workerQueue = null;
public Worker(LinkedBlockingQueue<Worker> idlechannels) {
new Thread(new Runnable() {
@Override
public void run() {
//3. You call the insert quote method
System.out.println("3.");
insertquote();
}
}).start();
}
public void put(Integer quote) {
//7. Add a task for each Thread to do
System.out.println("7.");
taskqueueForEachWorker.add(quote);
}
//TODO The problem is here: After you execute this line : workerQueue.add(this); this method ends, NO MORE worker Threads are put on the queue,
// thus at point 6 you block, well because there are no more worker Threads an no one add them.
public void insertquote() {
try {
// 4. You try to take an Integer from the Pool of tasks from rach Thread, but there is nothing yet - it is empty, thus each Thread (worker)
// blocks here, waiting for a task
System.out.println("4.");
Integer thisquote = taskqueueForEachWorker.take(); // This will successed only after 7.
workerQueue.add(this);
} catch (Exception ex) {
}
}
}
public static void main(String[] args) {
Pool pool = new Pool(5);
Random randomGenerator = new Random();
for (int node = 0; node < 20; node++) {
int d = randomGenerator.nextInt(5 * 200);
System.out.println("5.");
pool.execute(d);
}
}}
输出将为1.2.3.4.2.3.3.4.2.3.2.2.3.4.4.5.6.6.7.5.6.6.6.6.6.6.6.6.6.6.7.7.6.7.6.7.7.6.7.7.6.7.6.7.5.6.
请确保最后一行为6。如果由于insertQuote方法已退出,因此队列现在为空,则这里将阻塞所有的worker线程。
而且,在我看来,由于您的work线程每个都使用一个分离的队列,所以您应该实现“工作窃取”模式,即Deque。也调查一下。
发布于 2012-08-06 17:39:00
请参阅下面的重构(仍然不完美,但可读性稍强)。你的问题如下:
throttleheapthreadpool的构造函数中从1循环到5)insertquote 一次并返回到空闲池。因此,总的来说,您提交了4个已完成的作业,然后工人返回到队列中,然后给他们额外的4个作业(总共8个),除非他们不使用作业,因为他们的insertquote方法已经退出。
解决方案:在while循环中运行insertquote:
public void insertquote() {
try {
while (true) {
taskqueue.take();
idle.add(this);
}
} catch (Exception ex) {
}
}有关信息,以下是我当前版本的代码:
public class ThrottleHeapThreadPool {
private final BlockingQueue<QuoteWorkerObject> idlechannels = new LinkedBlockingQueue<QuoteWorkerObject>();
public static void main(String[] args) {
ThrottleHeapThreadPool pool = new ThrottleHeapThreadPool(5, 200);
Random randomGenerator = new Random();
for (int node = 0; node < 20; node++) {
int d = randomGenerator.nextInt(5 * 200);
pool.execute(d);
}
}
public ThrottleHeapThreadPool(int poolsize, int stocks) {
for (int i = 1; i < poolsize; i++) {
QuoteWorkerObject worker = new QuoteWorkerObject(idlechannels);
idlechannels.add(worker);//All WORKERS to Idle pool to start
worker.init();
}
}
public void execute(Integer quote) {
try {
//extract worker from pool
QuoteWorkerObject worker = idlechannels.take();
worker.put(quote);
} catch (InterruptedException e) {
}
}
class QuoteWorkerObject {
private final BlockingQueue<Integer> taskqueue = new LinkedBlockingQueue<Integer>();
private final BlockingQueue<QuoteWorkerObject> idle;
@SuppressWarnings("unchecked")
public QuoteWorkerObject(BlockingQueue<QuoteWorkerObject> idlechannels) {
this.idle = idlechannels;
}
public void init() {
new Thread(new Runnable() {
public void run() {
insertquote();
}
}).start();
}
public void put(Integer quote) {
taskqueue.add(quote);
}
public void insertquote() {
try {
while (true) {
taskqueue.take();
idle.add(this);
}
} catch (Exception ex) {
}
}
}
}https://stackoverflow.com/questions/11832770
复制相似问题