首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >阻塞队列竞争条件?

阻塞队列竞争条件?
EN

Stack Overflow用户
提问于 2012-01-05 06:25:32
回答 3查看 807关注 0票数 4

我正在尝试实现一个高性能的阻塞队列,这个队列是由一个循环缓冲区支持的,它位于p线程、signaphore.h和gcc原子构建器之上。队列需要处理来自不同线程的多个同步读取器和写入器。

我已经分离出某种种族条件,我不确定这是否是关于某些原子操作和信号量行为的错误假设,或者我的设计是否有根本的缺陷。

我已经将其提取并简化为下面的独立示例。我希望这个程序永远不会回来。但是,在经过几十万次迭代之后,它会返回队列中检测到的损坏。

在下面的示例(对于公开)中,它实际上没有存储任何内容,它只是将一个存储实际数据的单元格设置为1,并将0设置为表示一个空单元格。有一个计数信号量(空缺)表示空单元格的数目,另一个计数信号量(占用率)表示被占用的单元格的数量。

作者所做的工作如下:

vacancies

  • atomically获取下一个头索引(mod队列大小)

  • 向其写入

  • 增量占用者

读者所做的正好相反:

occupants

  • atomically获取下一个尾索引(mod队列大小)

  • 从它读取

  • 增量空缺

我希望在上述情况下,精确的一个线程可以同时读取或写入任何给定的单元格。

任何关于它为什么不工作的想法或调试策略都值得赞赏。下面的代码和输出。

代码语言:javascript
复制
#include <stdlib.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2

struct CountingSemaphore
{
    sem_t m;
    CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
    void post() { sem_post(&m); }
    void wait() { sem_wait(&m); }
    ~CountingSemaphore() { sem_destroy(&m); }
};

struct BlockingQueue
{
    unsigned int head; // (head % capacity) is next head position
    unsigned int tail; // (tail % capacity) is next tail position
    CountingSemaphore vacancies; // how many cells are vacant
    CountingSemaphore occupants; // how many cells are occupied

    int cell[QUEUE_CAPACITY];
// (cell[x] == 1) means occupied
// (cell[x] == 0) means vacant

    BlockingQueue() :
        head(0),
        tail(0),
        vacancies(QUEUE_CAPACITY),
        occupants(0)
    {
        for (size_t i = 0; i < QUEUE_CAPACITY; i++)
            cell[i] = 0;
    }

    // put an item in the queue
    void put()
    {
        vacancies.wait();

        // atomic post increment
        set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

        occupants.post();
    }

    // take an item from the queue
    void take()
    {
        occupants.wait();

        // atomic post increment
        get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);

        vacancies.post();
    }

    // set cell i
    void set(unsigned int i)
    {
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
        {
            corrupt("set", i);
            exit(-1);
        }
    }

    // get cell i
    void get(unsigned int i)
    {
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
        {
            corrupt("get", i);
            exit(-1);
        }
    }

    // corruption detected
    void corrupt(const char* action, unsigned int i)
    {
        static CountingSemaphore sem(1);
        sem.wait();

        cerr << "corruption detected" << endl;
        cerr << "action = " << action << endl;
        cerr << "i = " << i << endl;
        cerr << "head = " << head << endl;
        cerr << "tail = " << tail << endl;

        for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
            cerr << "cell[" << j << "] = " << cell[j] << endl;
    }
};

BlockingQueue q;

// keep posting to the queue forever
void* Source(void*)
{
    while (true)
        q.put();

    return 0;
}

// keep taking from the queue forever
void* Sink(void*)
{
    while (true)
        q.take();

    return 0;
} 

int main()
{
    pthread_t id;

    // start some pthreads to run Source function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Source, 0))
            abort();

    // start some pthreads to run Sink function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Sink, 0))
            abort();

    while (true);
}

将上述内容汇编如下:

代码语言:javascript
复制
    $ g++ -pthread AboveCode.cpp
    $ ./a.out

每次输出都是不同的,但是这里有一个例子:

代码语言:javascript
复制
    corruption detected
    action = get
    i = 6
    head = 122685
    tail = 122685
    cell[0] = 0
    cell[1] = 0
    cell[2] = 1
    cell[3] = 0
    cell[4] = 1
    cell[5] = 0
    cell[6] = 1
    cell[7] = 1

我的系统是Intel Core 2上的Ubuntu 11.10:

代码语言:javascript
复制
    $ uname -a
    Linux 3.0.0-14-generic #23-Ubuntu SMP \
      Mon Nov 21 20:28:43 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux
    $ cat /proc/cpuinfo | grep Intel
    model name : Intel(R) Core(TM)2 Quad  CPU   Q9300  @ 2.50GHz
    $ g++ --version
    g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1

谢谢安德鲁。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2012-01-05 12:14:10

一种可能的情况,一步一步地跟踪两个写入线程(W0,W1)和一个读取器线程(R0)。W0输入put()比W1早,被操作系统或硬件中断,然后完成。

代码语言:javascript
复制
        w0 (core 0)               w1 (core 1)                r0
t0         ----                      ---       blocked on occupants.wait() / take
t1      entered put()                ---                    ---         
t2      vacancies.wait()           entered put()            ---
t3      got new_head = 1           vacancies.wait()         ---
t4     <interrupted by OS>         got new_head = 2         ---
t5                                 written 1 at cell[2]     ---
t6                                 occupants.post();        ---
t7                                 exited put()            waked up
t8                                   ---               got new_tail = 1
t9     <still in interrupt>          ---    read 0 from ceil[1]  !! corruption !!
t10     written 1 at cell[1]                           
t11     occupants.post();
t12     exited put()
票数 4
EN

Stack Overflow用户

发布于 2012-01-05 09:15:37

从设计的角度来看,我会将整个队列视为共享资源,并使用单个互斥保护它。

作者所做的工作如下:

  1. 将互斥
  2. 写入队列(包括索引的处理)
  3. 释放互斥

读者所做的工作如下:

indexes)

  • free

  • 从队列中读取互斥

  • (包括处理互斥

)

票数 1
EN

Stack Overflow用户

发布于 2012-01-05 08:21:53

我有个理论。这是一个循环队列,因此一个读取线程可能会被重叠。假设读取器取索引0。在它做任何事情之前,它会失去CPU。另一个读取器线程取索引1,然后2,然后3.然后是7,然后0。第一个读取器醒来后,两个线程都认为它们对索引0具有独占访问权。不知道怎么证明。希望这能有所帮助。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/8738503

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档