首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >单一生产者多消费者案例中的死锁

单一生产者多消费者案例中的死锁
EN

Stack Overflow用户
提问于 2015-07-24 06:40:35
回答 2查看 186关注 0票数 0

有人能指出为什么这段代码会导致死锁吗?这是一个单一生产者,多个消费者的问题。生产者有8个缓冲区。在这里它有4个消费者。每个消费者将有两个缓冲区。当一个缓冲区被填满时,它会将其标记为准备使用并切换到第二个缓冲区。然后,消费者可以处理此缓冲区。完成后,它将缓冲区返回给生产者。

消费者0的缓冲区0-1消费者1的缓冲区2-3消费者2的缓冲区4-5消费者3的缓冲区6-7

程序偶尔会进入死锁状态。可以理解的是,由于标志只能处于一种状态,即0或1,因此至少消费者或生产者都可以继续。如果继续,它最终将解锁死锁。

代码语言:javascript
复制
#include <iostream>
#include <thread>
#include <mutex>

using namespace std;

const int BUFFERSIZE = 100;
const int row_size = 10000;
class sharedBuffer
{
public:
    int B[8][BUFFERSIZE];
    volatile int B_STATUS[8];
    volatile int B_SIZE[8];
    sharedBuffer()
    {
        for (int i=0;i<8;i++)
        {
            B_STATUS[i] = 0;
            B_SIZE[i] = 0;
            for (int j=0;j<BUFFERSIZE;j++)
            {
                B[i][j] = 0;
            }
        }
    }

};

class producer
{
public:
    sharedBuffer * buffer;
    int data[row_size];
    producer(sharedBuffer * b)
    {
        this->buffer = b;
        for (int i=0;i<row_size;i++)
        {
            data[i] = i+1;
        }
    }
    void produce()
    {
        int consumer_id;
        for(int i=0;i<row_size;i++)
        {
            consumer_id = data[i] % 4;
            while(true)
            {
                if (buffer->B_STATUS[2*consumer_id] ==1 && buffer->B_STATUS[2*consumer_id + 1] == 1)
                continue;
                if (buffer->B_STATUS[2*consumer_id] ==0 )
                {
                    buffer->B[2*consumer_id][buffer->B_SIZE[2*consumer_id]++] = data[i];
                    if(buffer->B_SIZE[2*consumer_id] == BUFFERSIZE || i==row_size -1)
                    {
                        buffer->B_STATUS[2*consumer_id] =1;
                    }
                    break;  
                }
                else if (buffer->B_STATUS[2*consumer_id+1] ==0 )
                {
                    buffer->B[2*consumer_id+1][buffer->B_SIZE[2*consumer_id+1]++] = data[i];
                                        if(buffer->B_SIZE[2*consumer_id+1] == BUFFERSIZE || i==row_size -1)
                                        {
                                                buffer->B_STATUS[2*consumer_id+1] =1;
                                        }
                                        break;
                } 
            }       
        }
        //some buffer is not full, still need set the flag to 1
        for (int i=0;i<8;i++)
        {
            if (buffer->B_STATUS[i] ==0 && buffer->B_SIZE[i] >0 )
                buffer->B_STATUS[i] = 1;
        }
        cout<<"Done produce, wait the data to be consumed\n";
        while(true)
        {
            if (buffer->B_STATUS[0] == 0 && buffer->B_SIZE[0] == 0 
                && buffer->B_STATUS[1] == 0 && buffer->B_SIZE[1] == 0 
                && buffer->B_STATUS[2] == 0 && buffer->B_SIZE[2] == 0 
                && buffer->B_STATUS[3] == 0 && buffer->B_SIZE[3] == 0
                && buffer->B_STATUS[4] == 0 && buffer->B_SIZE[4] == 0 
                && buffer->B_STATUS[5] == 0 && buffer->B_SIZE[5] == 0 
                && buffer->B_STATUS[6] == 0 && buffer->B_SIZE[6] == 0 
                && buffer->B_STATUS[7] == 0 && buffer->B_SIZE[7] == 0 )             
            {
                for (int i=0;i<8;i++)
                    buffer->B_STATUS[i] = 2;
                break;
            }
        }       
    };

};

class consumer
{
public:
    sharedBuffer * buffer;
    int sum;
    int index;
    consumer(int id, sharedBuffer * buf){this->index = id;this->sum = 0;this->buffer = buf;};
    void consume()
    {
        while(true)
        {
            if (buffer->B_STATUS[2*index] ==0 && buffer->B_STATUS[2*index+1] ==0 )
                continue;
            if (buffer->B_STATUS[2*index] ==2 && buffer->B_STATUS[2*index+1] ==2 )
                                break;
            if (buffer->B_STATUS[2*index] == 1)
            {
                for (int i=0;i<buffer->B_SIZE[2*index];i++)
                {
                    sum+=buffer->B[2*index][i];
                }
                buffer->B_STATUS[2*index]=0;
                buffer->B_SIZE[2*index] =0; 
            }

            if (buffer->B_STATUS[2*index+1] == 1)
                        {
                                for (int i=0;i<buffer->B_SIZE[2*index+1];i++)
                                {
                                        sum+=buffer->B[2*index+1][i];
                                }
                                buffer->B_STATUS[2*index+1]=0;
                                buffer->B_SIZE[2*index+1] =0;
                        }

        }
        printf("Sum of consumer %d = %d \n",index,sum);
    };

};
int main()
{
    sharedBuffer b;
    producer p(&b);
    consumer c1(0,&b),c2(1,&b),c3(2,&b),c4(3,&b);
        thread p_t(&producer::produce,p);
    thread c1_t(&consumer::consume,c1);
    thread c2_t(&consumer::consume,c2);
    thread c3_t(&consumer::consume,c3);
    thread c4_t(&consumer::consume,c4);
    p_t.join();c1_t.join();c2_t.join();c3_t.join();c4_t.join();
}
EN

回答 2

Stack Overflow用户

发布于 2015-07-24 07:01:02

这在很多方面都有缺陷。编译器可以重新排序您的指令,不同的CPU内核可能会看到不同顺序的内存操作。

基本上,你的生产者是这样做的:

  1. 它将数据写入缓冲区
  2. 它设置标志

你的消费者会这样做:

  1. 它读取标志
  2. ,如果该标志是它想要的,它读取数据
  3. ,它重置标志

这是不起作用的,有几个原因。

  • 编译器可以重新排序您的指令(在消费者和生产者端),以便以不同的顺序执行操作。例如,在生产者端,它可以将所有计算存储在寄存器中,然后先将状态标志写入内存,然后再写入数据。然后消费者将获得过时的数据。
  • 即使没有这种情况,也不能保证不同的CPU核心以相同的顺序看到对内存的不同写入(例如,如果它们有不同的高速缓存,并且您的标志和数据在不同的高速缓存线上)。

这可能会导致各种各样的问题--数据损坏、死锁、段错误,这取决于您的代码到底做了什么。我还没有充分分析您的代码来告诉您这会导致死锁的确切原因,但我一点也不感到惊讶。

请注意,'volatile‘关键字对于这种类型的同步是完全无用的。‘'volatile’只适用于信号处理(unix信号),而不适用于多线程代码。

正确的方法是使用适当的同步(例如互斥锁)或原子操作(例如std:: atomic )。他们有各种不同的保证,以确保上述问题不会发生。

如果速度不是最重要的,Mutexes通常更容易使用。原子操作可以让您获得更多的控制,但它们的使用非常棘手。

我建议您使用互斥锁来执行此操作,然后分析程序,然后仅在速度不够快的情况下才执行原子操作。

valgrind是一个很棒的工具,可以用来调试多线程程序(它可以指出不同步的内存访问等等)。

票数 0
EN

Stack Overflow用户

发布于 2015-07-24 13:13:12

谢谢你的有用的评论。我认为如果确保所有的标志/状态值都是从内存中读取的,而不是从寄存器/缓存中读取的,那么无论编译器如何重新组织指令,死锁都应该不会发生。volatile关键字应强制执行此操作。看起来我的理解是错的。

另一件令人费解的事情是,我以为状态变量的值应该只是(0,1,2)中的一个,但有一次,我看到了像5384这样的值。不知何故,数据被破坏了。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31599275

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档