几年前,我的项目需要在两个线程之间添加一个FIFO队列。在那个时候,我有一些有趣的想法,如何在没有任何atomics和锁的情况下做到这一点。(这个算法有一个讨论,但它是俄文的。)
其想法如下:
我们有两个线程:作者和读者。Queue类有两个单独的队列:一个用于读取器,另一个用于编写器。
读取器从队列中读取数据,直到数据变为空。作者写入自己的队列,每次写完后,它会检查读取器是否有什么可读的。如果没有,那么编写器将其队列传递给读者,并为自己启动新的队列。
读者和作家相互接触的唯一地方是把作家的队伍传递给读者,但这是安全的。有关详细信息,请参阅下面的代码注释或GitHub。
当作者将数据写入自己的队列中,但读者仍有一些需要阅读的时候,才是我们能够面对的唯一问题。这样,作者就不会将队列传递给读者。如果在此之后,作者在很长一段时间内都不会写任何数据,那么我们就会遇到这样的情况:在作者的队列中有一个数据,但读者却无法访问它。
这可以通过两种方式解决:
注意:还可以添加对多个作者和/或读取器的支持。这可以通过添加两个锁来实现:一个用于编写器,另一个用于读者。在这种情况下,作者会阻止作者,读者会阻止读者,但作者不会阻止读者。
我刚刚和我的朋友验证了这个算法,他在线程安全方面很好,我们没有发现任何问题。
UPD:正如下面的JS1所提到的,这段代码只有在具有强大内存排序保证的平台上才能正常工作。对于其他人来说,应该有单独的实现。
你能看一下代码吗,这样你就能找到我遗漏的一些问题了吗?
template <class T>
class LockFreeQueue
{
public:
/**
* Write data to the queue. Writer only method.
* Return value can be used by writer to decide when Flush() should be called.
*
* @param data Value to write to the queue
* @return true if data was send to the reader otherwise false
*/
bool Write(T *data);
/**
* Read data from queue. Reader only method.
*
* @param data [OUT] Data to retrieve.
* @return true if data was retrieved otherwise false.
*/
bool Read(T *&data);
/**
* Flush remained data to the reader. Writer only method.
*
* @return true if data was flushed otherwise false.
*/
bool Flush();
/**
* Inform that writer is finished.
*/
void SetWriterFinished() { isWriterFinished = true; }
/**
* Check if writer is finished.
*/
bool IsWriterFinished() { return isWriterFinished; }
private:
T *readerTop = nullptr;
T *writerTop = nullptr;
T *writerBottom = nullptr;
bool isWriterFinished = false;
};
template<class T>
bool LockFreeQueue<T>::Write(T* data)
{
assert(!isWriterFinished);
assert(data != nullptr);
data->next = nullptr;
if (writerTop != nullptr)
{
writerBottom->next = data;
writerBottom = data;
}
else
{
writerTop = writerBottom = data;
}
if (readerTop == nullptr) // reader don't have anything to read
{
readerTop = writerTop; // give reader writer's queue
writerTop = nullptr; // P1: start new writers queue
return true;
}
return false;
}
template<class T>
bool LockFreeQueue<T>::Read(T*& data)
{
// If writer stoped in Write() method before command marked as P1 there could be 2 situations:
// 1. If writer/reader threads/cpus became synchronized reader will not go inside a following 'if' and goes to the
// P2.
// 2. Otherwise reader will go inside following 'if' and will always return false because isWriterFinished will
// always be false in this situation (see first assert in Write() method).
if (readerTop == nullptr)
{
// Magic happens here. If writer is not finished writing and we have nothing to read then we go to the 'false'
// branch and return false.
// If writer is finished we go to the 'true' branch but we will not interfere in accessing to the 'readerTop' or
// 'writerTop' variables with writer because he can not call Write() method after finishing.
if (isWriterFinished && writerTop != nullptr)
{
// Reader will come at this place only when writer stops writing to the queue.
// Reader just read remaining part of the data if present.
readerTop = writerTop;
writerTop = nullptr;
if (readerTop == nullptr) // nothing to read
return false;
}
else
{
return false;
}
}
// P2
// At this place we garantee that readerTop variable is synchronized between reader and writer.
// Also we can garantee here that readerTop != nullptr
data = readerTop;
readerTop = data->next;
return true;
}
/**
* This method should be called by writer in a case when writer doesn't write into its own queue for a long time and
* its queue is not empty. In this case reader will not receive data from writers queue.
* Calling of this method by writer will not influence of calling Read() method by reader.
*/
template<class T>
bool LockFreeQueue<T>::Flush()
{
assert(!isWriterFinished);
if (writerTop == nullptr)
{
return true;
}
if (readerTop == nullptr)
{
readerTop = writerTop;
writerTop = nullptr;
return true;
}
return false;
}发布于 2015-07-24 23:06:01
您的代码将在具有强内存排序保证的x86目标上工作。但它不适用于其他目标,如ARM。
下面是代码如何失败的示例。在Write()中,在编写线程中可能会发生以下事件序列:
// Add new item to queue.
data->next = nullptr; // Write #1
writerBottom->next = data; // Write #2
writerBottom = data;
// Need memory barrier here!
// Give queue to reader.
readerTop = writerTop; // Write #3问题是,这些内存写入不能保证在另一个核心上以相同的顺序进行。因此,在读者的核心,它可以看到readerTop已经改变(写#3)。但是当迭代列表时,它可能看不到最后一个附加到队列中的元素(写#2)。更糟糕的是,甚至可能看不到最后一个元素(写#1)中的空指针,最终取消对一个随机指针的引用。
我没有看到您如何使用isWriterFinished的例子,但我确信它也遇到了同样的问题。
为了解决这个问题,我建议对线程之间共享的变量使用C++原子,并在读取/写入atomics时使用适当的memory_order参数来提供适当的栅栏。
https://codereview.stackexchange.com/questions/97988
复制相似问题