这是对以前的一个我的问题的后续,在那里我提供了另一个相同类型的队列,以获得一些反馈。有些人指出了我犯的一些基本错误,我开始了解到,当我谈到填充变量时,我非常天真。这是一个更新的版本,使用我在前一个线程中得到的反馈创建。
新的队列为32位,其目的是便携和重量轻。
我仍然使用一个有界的循环缓冲区来存储/加载数据。它仍然使用两个游标,向下一个生产者/使用者指示他们应该使用的缓冲区上的哪个索引。当生产者/使用者希望增加各自的游标时,两个游标同时加载,因为这两个游标都包含在对齐结构中。这是为了确保真正共享对游标的访问,因为它们从不单独加载。一旦加载了游标,就会在包含两个游标的对象上的CAS之前执行充分/空检查。如果其中一个游标在过渡期间发生了更改,CAS将失败,并再次尝试。当CAS成功时,可以将数据放入缓冲区或从缓冲区中删除。为了计算缓冲区上的索引,使用了一个索引掩码,它将比2的幂小一个,因为我们可以使用按位-而不是模组将索引包装回到零。要使队列大小工作,队列大小必须是2的幂,因此用户指定的大小将提升到下一个2的幂。
虽然游标可能受到CAS操作的保护,但缓冲区上的每个节点都受到自旋锁的保护。这是为了防止消费者在生产者输入数据之前试图读取数据的情况。或者相反的情况,生产者试图在消费者完成阅读之前添加一些数据。
这是完整的源代码,为了清晰起见,删除了许多DO2风格的注释。
// SPDX-License-Identifier: GPL-2.0-or-later
/**
* C++14 32bit Lockless Bounded Circular MPMC Queue type.
* Author: Primrose Taylor
*/
#ifndef BOUNDED_CIRCULAR_MPMC_QUEUE_H
#define BOUNDED_CIRCULAR_MPMC_QUEUE_H
#include "stdio.h"
#include "stdlib.h"
#include
#include
#include
#include
#define CACHE_LINE_SIZE 64U
#if defined(_MSC_VER)
#define HARDWARE_PAUSE() _mm_pause();
#define _ENABLE_ATOMIC_ALIGNMENT_FIX 1 // MSVC atomic alignment fix.
#define ATOMIC_ALIGNMENT 4
#else
#define ATOMIC_ALIGNMENT 16
#if defined(__clang__) || defined(__GNUC__)
#define HARDWARE_PAUSE() __builtin_ia32_pause();
#endif
#endif
/**
* Lockless, Multi-Producer, Multi-Consumer, Bounded Circular Queue type.
* The type is intended to be light weight & portable.
* The sub-types are all padded to fit within cache lines. Padding may be put
* inbetween member variables if the variables are accessed seperatley.
*/
template
class bounded_circular_mpmc_queue final
{
/**
* Simple, efficient spin-lock implementation.
* A function that takes a void lambda function can be used to
* conveiniently do something which will be protected by the lock.
* @cite Credit to Erik Rigtorp https://rigtorp.se/spinlock/
*/
class spin_lock
{
std::atomic lock_flag;
public:
spin_lock()
: lock_flag{false}
{
}
void do_work_through_lock(const std::function functor)
{
lock();
functor();
unlock();
}
void lock()
{
while (true)
{
if (!lock_flag.exchange(true, std::memory_order_acquire))
{
break;
}
while (lock_flag.load(std::memory_order_relaxed))
{
should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
}
}
}
void unlock()
{
lock_flag.store(false, std::memory_order_release);
}
};
/**
* Structure that holds the two cursors.
* The cursors are held together because we'll only ever be accessing
* them both at the same time.
* We don't directly align the struct because we need to use it as an
* atomic variable, so we must align the atomic variable instead.
*/
struct cursor_data
{
uint_fast32_t producer_cursor;
uint_fast32_t consumer_cursor;
uint8_t padding_bytes[CACHE_LINE_SIZE -
sizeof(uint_fast32_t) -
sizeof(uint_fast32_t)
% CACHE_LINE_SIZE];
cursor_data(const uint_fast32_t in_producer_cursor = 0,
const uint_fast32_t in_consumer_cursor = 0)
: producer_cursor(in_producer_cursor),
consumer_cursor(in_consumer_cursor),
padding_bytes{0}
{
}
};
/**
* Structure that represents each node in the circular buffer.
* Access to the data is protected by a spin lock.
* Contention on the spin lock should be minimal, as it's only there
* to prevent the case where a producer/consumer may try work with an element before
* someone else has finished working with it. The data and the spin lock are seperated by
* padding to put them in differnet cache lines, since they are not accessed
* together in the case mentioned previously. The problem with this is
* that in low contention cases, they will be accessed together, and thus
* should be in the same cache line. More testing is needed here.
*/
struct buffer_node
{
T data;
uint8_t padding_bytes_0[CACHE_LINE_SIZE -
sizeof(T) % CACHE_LINE_SIZE];
spin_lock spin_lock_;
uint8_t padding_bytes_1[CACHE_LINE_SIZE -
sizeof(spin_lock)
% CACHE_LINE_SIZE];
buffer_node()
: spin_lock_(),
padding_bytes_0{0},
padding_bytes_1{0}
{
}
void get_data(T& out_data) const
{
spin_lock_.do_work_through_lock([&]()
{
out_data = data;
});
}
void set_data(const T& in_data)
{
spin_lock_.do_work_through_lock([&]()
{
data = in_data;
});
}
};
/**
* Strucutre that contains the index mask, and the circular buffer.
* Both are accessed at the same time, so they are not seperated by padding.
*/
struct alignas(CACHE_LINE_SIZE) circular_buffer_data
{
const uint_fast32_t index_mask;
buffer_node* circular_buffer;
uint8_t padding_bytes[CACHE_LINE_SIZE -
sizeof(const uint_fast32_t) -
sizeof(buffer_node*)
% CACHE_LINE_SIZE];
circular_buffer_data()
: index_mask(get_next_power_of_two()),
padding_bytes{0}
{
static_assert(queue_size > 0, "Can't have a queue size <= 0!");
static_assert(queue_size <= 0xffffffffU,
"Can't have a queue length above 32bits!");
static_assert(
std::is_copy_constructible_v ||
std::is_copy_assignable_v ||
std::is_move_assignable_v ||
std::is_move_constructible_v,
"Can't use non-copyable, non-assignable, non-movable, or non-constructible type!"
);
/** Contigiously allocate the buffer.
* The theory behind using calloc and not aligned_alloc
* or equivelant, is that the memory should still be aligned,
* since calloc will align by the type size, which in this case
* is a multiple of the cache line size.
*/
circular_buffer = (buffer_node*)calloc(
index_mask + 1, sizeof(buffer_node));
}
~circular_buffer_data()
{
if(circular_buffer != nullptr)
{
free(circular_buffer);
}
}
private:
/**
* @cite https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
*/
uint_least32_t get_next_power_of_two()
{
uint_least32_t v = queue_size;
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v++;
return v;
}
};
public:
bounded_circular_mpmc_queue()
: cursor_data_(cursor_data{}),
circular_buffer_data_()
{
}
bool push(const T& in_data)
{
cursor_data current_cursor_data;
// An infinite while-loop is used instead of a do-while, to avoid
// the yield/pause happening before the CAS operation.
while(true)
{
current_cursor_data = cursor_data_.load(std::memory_order_acquire);
// Check if the buffer is full..
if (current_cursor_data.producer_cursor + 1 == current_cursor_data.consumer_cursor)
{
return false;
}
// CAS operation used to make sure the cursors have not been incremented
// by another producer/consumer before we got to this point, and to then increment
// the cursor by 1 if it hasn't been changed.
if (cursor_data_.compare_exchange_weak(current_cursor_data,
{current_cursor_data.producer_cursor + 1,
current_cursor_data.consumer_cursor},
std::memory_order_release, std::memory_order_relaxed))
{
break;
}
should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
}
// Set the data
circular_buffer_data_.circular_buffer[
current_cursor_data.producer_cursor & circular_buffer_data_.index_mask
].set_data(in_data);
return true;
}
bool pop(T& out_data)
{
cursor_data current_cursor_data;
while(true)
{
current_cursor_data = cursor_data_.load(std::memory_order_acquire);
// Check if the queue is empty..
if (current_cursor_data.consumer_cursor == current_cursor_data.producer_cursor)
{
return false;
}
if (cursor_data_.compare_exchange_weak(current_cursor_data,
{current_cursor_data.producer_cursor,
current_cursor_data.consumer_cursor + 1},
std::memory_order_release, std::memory_order_relaxed))
{
break;
}
should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
}
// Get the data
circular_buffer_data_.circular_buffer[
current_cursor_data.consumer_cursor & circular_buffer_data_.index_mask
].get_data(out_data);
return true;
}
uint_fast32_t size() const
{
const cursor_data cursors = cursor_data_.load(std::memory_order_acquire);
return cursors.producer_cursor - cursors.consumer_cursor;
}
bool empty() const
{
return size() == 0;
}
bool full() const
{
return size() == circular_buffer_data_.index_mask + 1;
}
private:
alignas(CACHE_LINE_SIZE) std::atomic cursor_data_;
circular_buffer_data circular_buffer_data_;
private:
bounded_circular_mpmc_queue(
const bounded_circular_mpmc_queue&) = delete;
bounded_circular_mpmc_queue& operator=(
const bounded_circular_mpmc_queue&) = delete;
};
#endif我想知道我的推送/流行方法是否像我想的那样起作用?有任何可能的ABA问题吗?而使用自旋锁来保护每个节点,它们是最好的方式吗?我之所以使用它,是因为理论上它不需要经常使用,就像在大多数情况下,没有其他人仍然在处理节点。
任何帮助都会得到极大的认可!干杯。
发布于 2022-02-01 20:59:16
https://codereview.stackexchange.com/questions/273599
复制相似问题