首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >线程安全无锁FIFO队列

线程安全无锁FIFO队列
EN

Code Review用户
提问于 2015-07-24 20:41:03
回答 1查看 4.6K关注 0票数 4

几年前,我的项目需要在两个线程之间添加一个FIFO队列。在那个时候,我有一些有趣的想法,如何在没有任何atomics和锁的情况下做到这一点。(这个算法有一个讨论,但它是俄文的。)

其想法如下:

我们有两个线程:作者和读者。Queue类有两个单独的队列:一个用于读取器,另一个用于编写器。

读取器从队列中读取数据,直到数据变为空。作者写入自己的队列,每次写完后,它会检查读取器是否有什么可读的。如果没有,那么编写器将其队列传递给读者,并为自己启动新的队列。

读者和作家相互接触的唯一地方是把作家的队伍传递给读者,但这是安全的。有关详细信息,请参阅下面的代码注释或GitHub

当作者将数据写入自己的队列中,但读者仍有一些需要阅读的时候,才是我们能够面对的唯一问题。这样,作者就不会将队列传递给读者。如果在此之后,作者在很长一段时间内都不会写任何数据,那么我们就会遇到这样的情况:在作者的队列中有一个数据,但读者却无法访问它。

这可以通过两种方式解决:

  1. 写入器可以设置一个标志,通知读者,写入器将不再将任何数据写入队列。在这种情况下,读取器将安全地获取写入器队列本身。
  2. 当作者仍计划将一些数据放入队列时,它可以调用刷新方法(可能几次)来强制将其队列传递给读取器。

注意:还可以添加对多个作者和/或读取器的支持。这可以通过添加两个锁来实现:一个用于编写器,另一个用于读者。在这种情况下,作者会阻止作者,读者会阻止读者,但作者不会阻止读者。

我刚刚和我的朋友验证了这个算法,他在线程安全方面很好,我们没有发现任何问题。

UPD:正如下面的JS1所提到的,这段代码只有在具有强大内存排序保证的平台上才能正常工作。对于其他人来说,应该有单独的实现。

你能看一下代码吗,这样你就能找到我遗漏的一些问题了吗?

代码语言:javascript
复制
template <class T>
class LockFreeQueue
{
public:
    /**
     * Write data to the queue. Writer only method.
     * Return value can be used by writer to decide when Flush() should be called.
     * 
     * @param data Value to write to the queue
     * @return true if data was send to the reader otherwise false
     */
    bool Write(T *data);

    /**
     * Read data from queue. Reader only method.
     * 
     * @param data [OUT] Data to retrieve.
     * @return true if data was retrieved otherwise false.
     */
    bool Read(T *&data);

    /**
     * Flush remained data to the reader. Writer only method.
     * 
     * @return true if data was flushed otherwise false.
     */
    bool Flush();

    /**
     * Inform that writer is finished.
     */
    void SetWriterFinished() { isWriterFinished = true; }

    /**
     * Check if writer is finished.
     */
    bool IsWriterFinished() { return isWriterFinished; }

private:
    T *readerTop    = nullptr;
    T *writerTop    = nullptr;
    T *writerBottom = nullptr;

    bool isWriterFinished = false;
};

template<class T>
bool LockFreeQueue<T>::Write(T* data)
{
    assert(!isWriterFinished);
    assert(data != nullptr);

    data->next = nullptr;

    if (writerTop != nullptr)
    {
        writerBottom->next = data;
        writerBottom = data;
    }
    else
    {
        writerTop = writerBottom = data;
    }

    if (readerTop == nullptr) // reader don't have anything to read
    {
        readerTop = writerTop; // give reader writer's queue
        writerTop = nullptr; // P1: start new writers queue
        return true;
    }
    return false;
}

template<class T>
bool LockFreeQueue<T>::Read(T*& data)
{
    // If writer stoped in Write() method before command marked as P1 there could be 2 situations:
    // 1. If writer/reader threads/cpus became synchronized reader will not go inside a following 'if' and goes to the
    //    P2.
    // 2. Otherwise reader will go inside following 'if' and will always return false because isWriterFinished will
    //    always be false in this situation (see first assert in Write() method).
    if (readerTop == nullptr)
    {
        // Magic happens here. If writer is not finished writing and we have nothing to read then we go to the 'false'
        // branch and return false.
        // If writer is finished we go to the 'true' branch but we will not interfere in accessing to the 'readerTop' or
        // 'writerTop' variables with writer because he can not call Write() method after finishing.
        if (isWriterFinished && writerTop != nullptr)
        {
            // Reader will come at this place only when writer stops writing to the queue.
            // Reader just read remaining part of the data if present.
            readerTop = writerTop;
            writerTop = nullptr;

            if (readerTop == nullptr) // nothing to read
                return false;
        }
        else
        {
            return false;
        }
    }

    // P2
    // At this place we garantee that readerTop variable is synchronized between reader and writer.
    // Also we can garantee here that readerTop != nullptr
    data = readerTop;
    readerTop = data->next;

    return true;
}

/**
 * This method should be called by writer in a case when writer doesn't write into its own queue for a long time and
 * its queue is not empty. In this case reader will not receive data from writers queue.
 * Calling of this method by writer will not influence of calling Read() method by reader.
 */
template<class T>
bool LockFreeQueue<T>::Flush()
{
    assert(!isWriterFinished);

    if (writerTop == nullptr)
    {
        return true;
    }

    if (readerTop == nullptr)
    {
        readerTop = writerTop;
        writerTop = nullptr;
        return true;
    }
    return false;
}
EN

回答 1

Code Review用户

发布于 2015-07-24 23:06:01

需要内存屏障

您的代码将在具有强内存排序保证的x86目标上工作。但它不适用于其他目标,如ARM。

下面是代码如何失败的示例。在Write()中,在编写线程中可能会发生以下事件序列:

代码语言:javascript
复制
    // Add new item to queue.
    data->next = nullptr;          // Write #1
    writerBottom->next = data;     // Write #2
    writerBottom = data;

    // Need memory barrier here!

    // Give queue to reader.
    readerTop = writerTop;         // Write #3

问题是,这些内存写入不能保证在另一个核心上以相同的顺序进行。因此,在读者的核心,它可以看到readerTop已经改变(写#3)。但是当迭代列表时,它可能看不到最后一个附加到队列中的元素(写#2)。更糟糕的是,甚至可能看不到最后一个元素(写#1)中的空指针,最终取消对一个随机指针的引用。

我没有看到您如何使用isWriterFinished的例子,但我确信它也遇到了同样的问题。

为了解决这个问题,我建议对线程之间共享的变量使用C++原子,并在读取/写入atomics时使用适当的memory_order参数来提供适当的栅栏。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/97988

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档