我刚刚开始学习多线程。我已经写了一个简单的应用程序。应用程序创建了三个线程。两个线程写,一个线程读。写入器线程写入全局数组中的单独位置。写入线程在递增数组中的值之后通知读取器。然后,读线程递减数组中的值,并再次等待写线程更新数组中的相应值。应用程序的代码粘贴在下面。
我看到的是编写器(生产者)线程比读取器(消费者)线程获得更多的时间片。我想我做错了什么。如果应用程序的输出被重定向到一个文件,那么可以观察到有更多来自生产者的连续消息,而来自消费者的消息很少出现。我所期望的是,当生产者更新其数据时,消费者立即处理它,即在每个生产者消息之后应该打印消费者消息。
感谢并致以问候
~插头
#include <stdio.h>
#include <pthread.h>
const long g_lProducerCount = 2; /*Number of Producers*/
long g_lProducerIds[2]; /*Producer IDs = 0, 1...*/
long g_lDataArray[2]; /*Data[0] for Producer 0, Data[1] for Producer 1...*/
/*Producer ID that updated the Data. -1 = No update*/
long g_lChangedProducerId = -1;
pthread_cond_t g_CondVar = PTHREAD_COND_INITIALIZER;
pthread_mutex_t g_Mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_t g_iThreadIds[3]; /*3 = 2 Producers + 1 Consumer*/
unsigned char g_bExit = 0; /*Exit application? 0 = No*/
void* Producer(void *pvData)
{
long lProducerId = *(long*)pvData; /*ID of this Producer*/
while(0 == g_bExit) {
pthread_mutex_lock(&g_Mutex);
/*Tell the Consumer who's Data is updated*/
g_lChangedProducerId = lProducerId;
/*Update the Data i.e. Increment*/
++g_lDataArray[lProducerId];
printf("Producer: Data[%ld] = %ld\n",
lProducerId, g_lDataArray[lProducerId]);
pthread_cond_signal(&g_CondVar);
pthread_mutex_unlock(&g_Mutex);
}
pthread_exit(NULL);
}
void* Consumer(void *pvData)
{
while(0 == g_bExit) {
pthread_mutex_lock(&g_Mutex);
/*Wait until one of the Producers update it's Data*/
while(-1 == g_lChangedProducerId) {
pthread_cond_wait(&g_CondVar, &g_Mutex);
}
/*Revert the update done by the Producer*/
--g_lDataArray[g_lChangedProducerId];
printf("Consumer: Data[%ld] = %ld\n",
g_lChangedProducerId, g_lDataArray[g_lChangedProducerId]);
g_lChangedProducerId = -1; /*Reset for next update*/
pthread_mutex_unlock(&g_Mutex);
}
pthread_exit(NULL);
}
void CreateProducers()
{
long i;
pthread_attr_t attr;
pthread_attr_init(&attr);
for(i = 0; i < g_lProducerCount; ++i) {
g_lProducerIds[i] = i;
pthread_create(&g_iThreadIds[i + 1], &attr,
Producer, &g_lProducerIds[i]);
}
pthread_attr_destroy(&attr);
}
void CreateConsumer()
{
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_create(&g_iThreadIds[0], &attr, Consumer, NULL);
pthread_attr_destroy(&attr);
}
void WaitCompletion()
{
long i;
for(i = 0; i < g_lProducerCount + 1; ++i) {
pthread_join(g_iThreadIds[i], NULL);
}
}
int main()
{
CreateProducers();
CreateConsumer();
getchar();
g_bExit = 1;
WaitCompletion();
return 0;
}发布于 2011-07-01 07:20:58
你必须弄清楚你到底想要实现什么。目前,生产者只增加一个整数,而消费者减少该值。这不是一个非常有用的活动;)我知道这只是一个测试应用程序,但仍然不够清楚这个处理的目的是什么,约束是什么等等。
生产者生产一些“产品”。此生产的结果表示为整数值。0表示没有物品,1表示有挂起的物品,消费者可以使用。是那么回事吗?现在,生产者是否有可能在其中任何一个被消费之前生产几个项目(将数组单元增加到一个大于1的值)?或者,他是否必须等待最后一件物品被消费,然后才能将下一件物品放入仓库?存储空间是有限制的还是无限制的?如果它是有限的,那么这个限制是在所有生产者之间共享的,还是每个生产者都定义的?
我所期望的是,当生产者更新其数据时,消费者立即处理它,即在每个生产者消息之后应该打印消费者消息。
虽然不是很清楚你想要实现什么,但我会坚持这句话,并假设如下:每个生产者有一个项目的限制,生产者必须等待消费者清空存储空间,然后才能将新项目放入单元格中,即g_lDataArray中唯一允许的值是0和1。
为了允许线程之间的最大并发,您需要为g_lDataArray的每个单元(对于每个生产者)提供一个条件变量/互斥锁对。你还需要一个更新队列,这是一个提交了他们的工作的生产者列表和一个条件变量/互斥锁对来保护它,这将取代g_lChangedProducerId,它一次只能保存一个值。
每当生产者想要将项目放入存储区时,它必须获取相应的锁,检查存储区是否为空(g_lDataArraylProducerId == 0),如果不是,则等待条件变量,然后递增单元,释放持有的锁,获取消费者锁,将其id添加到更新队列,通知消费者,释放消费者锁。当然,如果生产者要执行生成某些真实项的任何实际计算,则在尝试将该项放入存储区之前,应在任何锁的作用域之外执行此工作。
在伪代码中,这看起来像这样:
// do some computations
item = compute();
lock (mutexes[producerId]) {
while (storage[producerId] != 0)
wait(condVars[producerId]);
storage[producerId] = item;
}
lock (consumerMutex) {
queue.push(producerId);
signal(consumerCondVar);
} 消费者应该如下操作:获取他的锁,检查是否有任何待处理的更新,如果没有等待条件变量,则从队列中取出一个更新(即更新生产者的编号),获取将要处理更新的生产者的锁,递减单元,通知生产者,释放生产者的锁,释放他的锁,最后处理更新。
lock (consumerMutex) {
while (queue.isEmpty())
wait(consumerCondVar);
producerId = queue.pop();
lock (mutexex[producerId]) {
item = storage[producerId];
storage[producerId] = 0;
signal(condVars[producerId]);
}
}
//process the update
process(item);希望这个答案就是你所需要的。
发布于 2011-06-30 08:22:57
问题可能是所有生产者都会更改g_lChangedProducerId,因此一个生产者写入的值可能会在消费者看到它之前被另一个生产者覆盖。
这意味着消费者实际上看不到第一个生产者已经产生了一些输出。
发布于 2013-04-02 15:44:17
当你制作的时候,它可能会唤醒ProThread或者ConThread。如果它唤醒了ProThread,生产者就会再次生产,而ConThread在数据生产后不会立即消耗。这是你不想看到的。您所需要做的就是确保它在生成时不会唤醒ProThread。这里有一种解决方案
void* Producer(void *pvData)
{
........
//wait untill consumer consume its number
while(-1!=g_lChangedProducerId)
pthread_cond_wait(&g_CondVar,&g_Mutex);
//here to inform the consumer it produced the data
g_lChangedProducerId = lProducerId;
........
}
void* Consumer(void *pvData)
{
g_lChangedProducerId = -1;
**//wake up the producer when it consume
pthread_cond_signal(&g_CondVar);**
pthread_mutex_unlock(&g_Mutex);
}https://stackoverflow.com/questions/6528661
复制相似问题