我希望使用std::atomic创建一个无锁队列。
这可能不是我第一次尝试这么做的好机会:
template <typename T>
class atomic_queue
{
public:
using value_type = T;
private:
struct node
{
value_type m_value;
node* m_next;
node* m_prev;
node(const value_type& value) :
m_value(value),
m_next(nullptr),
m_prev(nullptr) {}
};
private:
std::atomic<node*> m_head = nullptr;
std::atomic<node*> m_tail = nullptr;
public:
void push(const value_type& value)
{
auto new_node = new node(value);
node* tmp = nullptr;
if (m_tail.compare_exchange_strong(tmp, new_node))
{
m_head.store(new_node, std::memory_order_relaxed);
return;
}
node* old_tail;
do {
old_tail = m_tail;
new_node->m_prev = old_tail;
} while (!m_tail.compare_exchange_strong(old_tail, new_node));
new_node->m_prev->m_next = new_node;
}
void pop()
{
if (m_head.load(std::memory_order_relaxed) == nullptr)
{
return;
}
node* tmp = nullptr;
node* head = m_head;
if (m_tail.compare_exchange_strong(head, tmp))
{
m_head.store(tmp, std::memory_order_relaxed);
return;
}
node* old_head;
do {
old_head = m_head;
} while (m_head && !m_head.compare_exchange_strong(old_head, old_head->m_next));
if (old_head)
{
delete old_head;
}
}
bool empty()
{
return m_head.load(std::memory_order_relaxed) == nullptr;
}
value_type& front()
{
node* head = m_head.load(std::memory_order_acquire);
return head->m_value;
}
};这里要注意的是,我将m_prev存储在node上,这样我就可以在成功的push之后更新m_next of m_tail,而不必通过m_tail进行实际操作,以防它已经被另一个线程更改了。因此,即使另一个线程已经到达push一个新值,当前线程仍然会将它所看到的m_tail的m_next链接到新节点。
据我所知,有几件事情并不是真正的线程安全,而且我也想不出解决这些问题的好办法:
让我们假设队列中的thread1 pops是惟一的条目,然后进入下面的if语句:
node* tmp = nullptr;
node* head = m_head;
if (m_tail.compare_exchange_strong(head, tmp))
{
// Now thread2 kicks in
m_head.store(tmp, std::memory_order_relaxed);
return;
}让我们假设thread2在标记的点插入到push队列的一个新值,下面的语句将被执行:
node* tmp = nullptr;
if (m_tail.compare_exchange_strong(tmp, new_node))
{
m_head.store(new_node, std::memory_order_relaxed);
return;
}让我们假设它完成了它的pushing而没有thread1继续,只有在thread1继续之后,thread1才会执行:
m_head.store(tmp, std::memory_order_relaxed);
return;根据我的理解,在这种情况下,内存顺序帮不了我,所以我不知道我的选择是什么?
另一个有问题的场景是,假设有两个读取器线程thread3和thread4在做相同的工作:
while (true)
{
if (!q.empty())
{
int v = q.front();
q.pop();
std::stringstream stream;
stream << "thread_3/4: " << v << '\n';
std::cout << stream.str();
}
}让我们假设队列大小为1,因此它们都可以看到队列不是空的,并获得对前端数据的引用,然后弹出元素并打印相同的结果。
在我看来,锁定在这个场景中是有帮助的,但是我不希望使用锁定,而且我也不希望读取线程关心同步问题,因为接口本身应该是负责的,但是由于front和pop是独立的,所以我不认为有一个很好的方法来处理这个问题。
还有一个问题是front可能访问nullptr,所以即使在这里,我也不知道如何处理这个问题。我可以让接口返回一个原始指针,或者std::optional,但在我看来,这两种解决方案似乎都不正确,所以我很想听听关于在这里应该做什么的意见。
另外,我不确定是否可以使用比CAS更便宜的方法,我知道我可以采用唯一的插槽方法,即每个线程通过在std::atomic<int> slot类型的原子上使用std::atomic<int> slot来将索引放入一个固定的数组中,因此每个线程都将队列推送到一个唯一的索引中,但我不喜欢这种方法,因为它限制了固定大小的队列。另一方面,使用new和delete可能也不是最快的事情,我可以使用一个排序的池分配器,但是我必须确保它是同步的--这是一个新的痛苦级别。
我甚至不确定这些是所有的问题,这些是我在我的实现中能发现的问题,我确信我没有想过所有的事情(或者可能是我想过的?),不管怎样,我很想听听你对所描述的问题的想法,以及克服这些问题的方法。
发布于 2020-09-26 14:28:22
在您的实现中有几个问题,其中一些您已经正确识别了。
m_head.store操作之间在m_tail上的CAS之后的竞争 do {
old_head = m_head;
} while (m_head && !m_head.compare_exchange_strong(old_head, old_head->m_next));pop中的一个节点之后,您将立即对其进行delete,但是在那个时候,另一个线程可能仍然有一个对它的引用并访问它(例如,pop中的另一个线程),从而导致一个事后使用。(这也称为内存回收问题。)
Explanation:假设两个线程当前在pop中,并且已经将相同的值读入old_head中。第一个线程继续进行,在m_head上执行CAS,然后在下一步中立即删除old_head。直到现在,第二个线程才继续自己更新m_head的尝试,使用old_head->m_next作为新的值。这意味着线程两个取消了指向刚刚删除的节点的指针。设计无锁甚至无锁的算法本身就很困难。问题2和3都可以通过使用内存回收方案来解决。问题4.通常通过不使用front操作来避免,而是让pop返回项(直接通过std::optional返回,或者通过try_pop版本通过引用获取输出参数,并返回指示操作是否成功的bool )。
无论是哪种方式,我都建议使用一种已经建立的无锁算法,如迈克尔斯科特排队。不幸的是,如果您决定实现该算法,您仍然需要处理内存回收问题。
我可以向您推荐我的锡金,它不仅提供了Michael队列的实现,而且还提供了几个内存回收方案,以防您还想自己做一些实验,但希望避免与安全的内存回收有关的麻烦。
内存回收方案是解决内存回收问题的一种算法。针对安全内存恢复问题,提出了很多解决方案,如危险点或基于时间的恢复,但每种方案都有其缺点。这就是为什么内存回收问题仍然被视为共享内存并发中当前最困难的开放问题。关于更多的细节,我可以参考我的硕士论文C++中无锁数据结构的有效内存回收。它不仅解释了内存回收问题和大量建议的回收方案,而且还讨论了基于通用接口的其中一些方案的实现。锡金建立在这一工作的基础上。
https://stackoverflow.com/questions/64072271
复制相似问题