有人能指出为什么这段代码会导致死锁吗?这是一个单一生产者,多个消费者的问题。生产者有8个缓冲区。在这里它有4个消费者。每个消费者将有两个缓冲区。当一个缓冲区被填满时,它会将其标记为准备使用并切换到第二个缓冲区。然后,消费者可以处理此缓冲区。完成后,它将缓冲区返回给生产者。
消费者0的缓冲区0-1消费者1的缓冲区2-3消费者2的缓冲区4-5消费者3的缓冲区6-7
程序偶尔会进入死锁状态。可以理解的是,由于标志只能处于一种状态,即0或1,因此至少消费者或生产者都可以继续。如果继续,它最终将解锁死锁。
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
const int BUFFERSIZE = 100;
const int row_size = 10000;
class sharedBuffer
{
public:
int B[8][BUFFERSIZE];
volatile int B_STATUS[8];
volatile int B_SIZE[8];
sharedBuffer()
{
for (int i=0;i<8;i++)
{
B_STATUS[i] = 0;
B_SIZE[i] = 0;
for (int j=0;j<BUFFERSIZE;j++)
{
B[i][j] = 0;
}
}
}
};
class producer
{
public:
sharedBuffer * buffer;
int data[row_size];
producer(sharedBuffer * b)
{
this->buffer = b;
for (int i=0;i<row_size;i++)
{
data[i] = i+1;
}
}
void produce()
{
int consumer_id;
for(int i=0;i<row_size;i++)
{
consumer_id = data[i] % 4;
while(true)
{
if (buffer->B_STATUS[2*consumer_id] ==1 && buffer->B_STATUS[2*consumer_id + 1] == 1)
continue;
if (buffer->B_STATUS[2*consumer_id] ==0 )
{
buffer->B[2*consumer_id][buffer->B_SIZE[2*consumer_id]++] = data[i];
if(buffer->B_SIZE[2*consumer_id] == BUFFERSIZE || i==row_size -1)
{
buffer->B_STATUS[2*consumer_id] =1;
}
break;
}
else if (buffer->B_STATUS[2*consumer_id+1] ==0 )
{
buffer->B[2*consumer_id+1][buffer->B_SIZE[2*consumer_id+1]++] = data[i];
if(buffer->B_SIZE[2*consumer_id+1] == BUFFERSIZE || i==row_size -1)
{
buffer->B_STATUS[2*consumer_id+1] =1;
}
break;
}
}
}
//some buffer is not full, still need set the flag to 1
for (int i=0;i<8;i++)
{
if (buffer->B_STATUS[i] ==0 && buffer->B_SIZE[i] >0 )
buffer->B_STATUS[i] = 1;
}
cout<<"Done produce, wait the data to be consumed\n";
while(true)
{
if (buffer->B_STATUS[0] == 0 && buffer->B_SIZE[0] == 0
&& buffer->B_STATUS[1] == 0 && buffer->B_SIZE[1] == 0
&& buffer->B_STATUS[2] == 0 && buffer->B_SIZE[2] == 0
&& buffer->B_STATUS[3] == 0 && buffer->B_SIZE[3] == 0
&& buffer->B_STATUS[4] == 0 && buffer->B_SIZE[4] == 0
&& buffer->B_STATUS[5] == 0 && buffer->B_SIZE[5] == 0
&& buffer->B_STATUS[6] == 0 && buffer->B_SIZE[6] == 0
&& buffer->B_STATUS[7] == 0 && buffer->B_SIZE[7] == 0 )
{
for (int i=0;i<8;i++)
buffer->B_STATUS[i] = 2;
break;
}
}
};
};
class consumer
{
public:
sharedBuffer * buffer;
int sum;
int index;
consumer(int id, sharedBuffer * buf){this->index = id;this->sum = 0;this->buffer = buf;};
void consume()
{
while(true)
{
if (buffer->B_STATUS[2*index] ==0 && buffer->B_STATUS[2*index+1] ==0 )
continue;
if (buffer->B_STATUS[2*index] ==2 && buffer->B_STATUS[2*index+1] ==2 )
break;
if (buffer->B_STATUS[2*index] == 1)
{
for (int i=0;i<buffer->B_SIZE[2*index];i++)
{
sum+=buffer->B[2*index][i];
}
buffer->B_STATUS[2*index]=0;
buffer->B_SIZE[2*index] =0;
}
if (buffer->B_STATUS[2*index+1] == 1)
{
for (int i=0;i<buffer->B_SIZE[2*index+1];i++)
{
sum+=buffer->B[2*index+1][i];
}
buffer->B_STATUS[2*index+1]=0;
buffer->B_SIZE[2*index+1] =0;
}
}
printf("Sum of consumer %d = %d \n",index,sum);
};
};
int main()
{
sharedBuffer b;
producer p(&b);
consumer c1(0,&b),c2(1,&b),c3(2,&b),c4(3,&b);
thread p_t(&producer::produce,p);
thread c1_t(&consumer::consume,c1);
thread c2_t(&consumer::consume,c2);
thread c3_t(&consumer::consume,c3);
thread c4_t(&consumer::consume,c4);
p_t.join();c1_t.join();c2_t.join();c3_t.join();c4_t.join();
}发布于 2015-07-24 07:01:02
这在很多方面都有缺陷。编译器可以重新排序您的指令,不同的CPU内核可能会看到不同顺序的内存操作。
基本上,你的生产者是这样做的:
你的消费者会这样做:
这是不起作用的,有几个原因。
这可能会导致各种各样的问题--数据损坏、死锁、段错误,这取决于您的代码到底做了什么。我还没有充分分析您的代码来告诉您这会导致死锁的确切原因,但我一点也不感到惊讶。
请注意,'volatile‘关键字对于这种类型的同步是完全无用的。‘'volatile’只适用于信号处理(unix信号),而不适用于多线程代码。
正确的方法是使用适当的同步(例如互斥锁)或原子操作(例如std:: atomic )。他们有各种不同的保证,以确保上述问题不会发生。
如果速度不是最重要的,Mutexes通常更容易使用。原子操作可以让您获得更多的控制,但它们的使用非常棘手。
我建议您使用互斥锁来执行此操作,然后分析程序,然后仅在速度不够快的情况下才执行原子操作。
valgrind是一个很棒的工具,可以用来调试多线程程序(它可以指出不同步的内存访问等等)。
发布于 2015-07-24 13:13:12
谢谢你的有用的评论。我认为如果确保所有的标志/状态值都是从内存中读取的,而不是从寄存器/缓存中读取的,那么无论编译器如何重新组织指令,死锁都应该不会发生。volatile关键字应强制执行此操作。看起来我的理解是错的。
另一件令人费解的事情是,我以为状态变量的值应该只是(0,1,2)中的一个,但有一次,我看到了像5384这样的值。不知何故,数据被破坏了。
https://stackoverflow.com/questions/31599275
复制相似问题