首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >由无锁容器管理的缓冲区的完整性

由无锁容器管理的缓冲区的完整性
EN

Stack Overflow用户
提问于 2016-02-28 01:11:40
回答 3查看 132关注 0票数 2

(两个误解后的澄清:如果生成器线程的数量小于堆栈大小,则代码运行良好。只有一个消费者发布插槽。我用32个制作者和16个插槽调优这个演示的方式是快速触发坏情况)

当压力测试一个用于多线程缓冲区管理的无锁堆栈时,我发现缓冲区的内容的完整性没有得到保证。现在我确信堆栈/LIFO解决方案不是最好的选择;但我仍然想了解这些缓冲区是如何被破坏的。

其思想是:一个无锁堆栈,包含指向“空闲”缓冲区的指针。它们可以被许多生产者线程中的一个检索到。然后,缓冲区被填充数据并“分派”到单个使用者线程,最终将它们返回到堆栈。

观察结果是:-两个线程以某种方式获得了相同的缓冲区。-一个线程正在获得一个缓冲区,它的内存仍然没有从刚刚释放它的其他线程中释放出来。

为了演示起见,我可以把下面的最简单的例子放在一起:

更新:我为任何想玩它的人做了一个有更好的调试输出的版本,在这里:https://ideone.com/v9VAqU

代码语言:javascript
复制
#include <atomic>
#include <assert.h>
#include <chrono>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

using namespace std;

#define N_SLOTS 16
#define N_THREADS 32

// The data buffers that are shared among threads
class Buffer { public: int data[N_THREADS] = {0}; } buffers[N_SLOTS];

// The lock-free stack under study
class LockFreeStack
{
    Buffer* stack[N_SLOTS];
    atomic_int free_slots, out_of_slots, retries;
public:
    LockFreeStack() : free_slots(0), out_of_slots(0), retries(0) {
        for (int i=0; i<N_SLOTS; i++)
            release_buffer(&buffers[i]);
    }
    Buffer* get_buffer()
    {
        int slot = --free_slots;
        if (slot < 0) {
            out_of_slots++;
            return nullptr;
        }
/// [EDIT] CAN GET PREEMPTED RIGHT HERE, BREAKING ATOMICITY!
        return stack[slot];
    }
    void release_buffer(Buffer* buf)
    {
        int slot;
        while(true) {
            slot = free_slots;
            if (slot <= 0) {
                stack[0] = buf;
                free_slots = 1;
                break;
            }
            stack[slot] = buf;
            if (free_slots++ == slot)
                break;
            retries++;
        }
    }
    ostream& toStream(ostream& oss) {
        return oss << "LockFreeStack with free_slots=" << free_slots << ", oos=" << out_of_slots << ", retries=" << retries;
    }
} lockFreeStack;

// Utility class to help with test
class PrintQueue {
    queue<Buffer*> q;
    mutex m;
public:
    void add(Buffer* buf) {
        lock_guard<mutex> lock(m);
        q.push(buf);
    }
    Buffer* pop() {
        lock_guard<mutex> lock(m);
        Buffer* buf;
        if (q.empty())
            return nullptr;
        buf = q.front();
        q.pop();
        return buf;
    }
} printQueue;

int main()
{
    vector<thread> workers;
    for (int t = 0; t < N_THREADS; ++t) {
        workers.push_back(thread([&,t] {
            while(true) {
                auto buf = lockFreeStack.get_buffer();
                if (buf) {
                    buf->data[t] = t;
                    this_thread::sleep_for(chrono::milliseconds(10));
                    printQueue.add(buf);
                }
            }
        }));
    }
    while(true) {
        this_thread::sleep_for(chrono::milliseconds(10));
        lockFreeStack.toStream(cout) << endl;
        Buffer *buf;
        while((buf = printQueue.pop())) {
            cout << "Got Buffer " << buf << " #" << (buf-buffers) << " { ";
            int used = 0;
            for(int t=0; t<N_THREADS; t++)
                if (buf->data[t]) {
                    used += 1;
                    cout << 't' << buf->data[t] << ' ';
                    buf->data[t] = 0;
                }
            cout << "}\n";
            assert (used == 1);
            lockFreeStack.release_buffer(buf);
        }
    }
    return 0;
}

还有一个输出不良的例子:

代码语言:javascript
复制
> LockFreeStack with free_slots=-2454858, oos=2454836, retries=0
> Got Buffer 0x604a40 #12 { t7 }
> Got Buffer 0x6049c0 #11 { t8 }
> Got Buffer 0x604b40 #14 { t1 }
> Got Buffer 0x604bc0 #15 { }
> test.cpp:111: int main(): Assertion `used == 1' failed.

我试过在这里到处使用std::atomic_thread_fence(),但没什么区别。

哪里有错?

(顺便说一句,用GCC的几个版本测试,包括5.2和4.6)

EN

回答 3

Stack Overflow用户

发布于 2016-02-28 14:42:50

您的LockFreeStack代码完全破坏了。

同时从2个线程调用的release_buffer可以将2个指针插入到同一个槽中,从而丢失一个指针。

if (free_slots++ == slot)将只对一个线程成功,因此另一个线程将尝试将其指针放入另一个槽中。但是它也可能是第一个插槽的赢家,所以你可以得到相同的,但在两个插槽。

您可以通过一个线程调用release_buffer和另一个调用get_buffer来获得相同的效果。这些场景中的一种,或者两者都是导致你腐败的原因。

release_buffer没有被限制到stack的大小,所以期望缓冲区溢出,然后所有的事情都会失控。

我建议:

  1. release_buffer首先选择一个唯一的插槽原子化,然后写到它。
  2. 当多个发布器竞争时隙时,插槽中指针的写入顺序不是保证,因此您需要一些其他方法来标记插槽在release_buffer上是有效的,在get_buffer上是无效的。最简单的方法是在get_buffer中使其为空。
  3. 绑定到堆栈大小的计数器。如果你不能做到一个原子操作,拿一个副本,做所有的改变,然后cas它回来。

编辑

下面是将相同的缓冲区返回到两个单元格的场景:

代码语言:javascript
复制
                                  ////T==0  free_slots==5

// thread 1
void release_buffer(Buffer* buf)  ////T==1  buf==buffers[7]
{
    int slot;
    while(true) {                 //// 1st iteration
        slot = free_slots;        ////T==2  free_slots==5 slot==5
        if (slot <= 0) {
            stack[0] = buf;            
            free_slots = 1;            
            break;
        }                         ////*** note other threads below ***
        stack[slot] = buf;        ////     stack[5]==buffers[7]
        if (free_slots++ == slot) ////T==5 free_slots==4 slot==5 ---> go for another round
            break;
        retries++;
    }
    while(true) {                 //// 2nd iteration
        slot = free_slots;        ////T==6 free_slots==4 slot==4
        if (slot <= 0) {
            stack[0] = buf;            
            free_slots = 1;            
            break;
        }
        stack[slot] = buf;        ////     stack[4]==buffers[7] //// BOOM!!!!
        if (free_slots++ == slot) ////T==7 free_slots==5 slot==4 ---> no other round
            break;
        retries++;
    }
}

// thread 2
Buffer* get_buffer() // thread
{
    int slot = --free_slots;      ////T==3  free_slots==4
    if (slot < 0) {
        out_of_slots++;
        return nullptr;
    }
    return stack[slot];
}

// thread 3
Buffer* get_buffer()
{
    int slot = --free_slots;      ////T==4  free_slots==3
    if (slot < 0) {
        out_of_slots++;
        return nullptr;
    }
    return stack[slot];
}

编辑2:断言失败.

如果你现在还没有找到它,它就在这里:

代码语言:javascript
复制
//// producer t==0
buf->data[t] = t; //// buf->data[t] == 0

//consumer
for(int t=0; t<N_THREADS; t++)  // first iteration, t==0
  if (buf->data[t]) { //// buf->data[t] == 0, branch not taken
    used += 1;
    ...
  //// used remains ==0   -----> assert fails

在缓冲区中写入t+1将修复它。

票数 2
EN

Stack Overflow用户

发布于 2016-03-10 16:49:31

谢谢你的建议。我最终发现了这个问题:

  1. free_slots == N
  2. 来自使用者线程#2的get_buffer()接受槽N1,但是在读取stackN1中的指针之前会被抢占。
  3. release_buffer(buf) (单生产者线程#1)在理论上空出的插槽上放置了新的缓冲区,准确地说是堆栈N1!

现在,原来位于stackN-1中的指针已经丢失(内存泄漏),如果get_buffer()的下一个线程在同一时间左右醒来,它将得到与线程#2相同的指针。

票数 0
EN

Stack Overflow用户

发布于 2016-02-28 01:25:56

我在Linux上使用gcc 5.3编译并执行了以下代码:

代码语言:javascript
复制
#include <atomic>
#include <iostream>

int main()
{
    for (int i=0; i<5; ++i)
    {
        std::atomic_int n;

        std::cout << n << std::endl;

        n=4;
    }
    return 0;
}

由此产生的产出如下:

代码语言:javascript
复制
306406976
4
4
4
4

由此,我得出结论,std::atomic_int的构造函数没有显式地清除原子整数的初始值。它必须显式初始化。我想验证这一事实,因为我对原子库不太熟悉。我的结果表明,必须显式初始化std::atomic_int,它们不会自动初始化为0。

我被提示根据以下观察来验证是否初始化了std::atomic_int

  • 在这里,LockFreeStack构造函数也不会显式地初始化std::atomic_int类成员。
  • 构造函数调用release_buffer()方法。
  • release_buffer()方法读取并使用free_slots

由此,我必须得出结论,这是未定义的行为。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35677513

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档