首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >C++14无锁多生产者多用户队列

C++14无锁多生产者多用户队列
EN

Code Review用户
提问于 2022-02-01 01:25:38
回答 1查看 1.2K关注 0票数 2

Introduction

这是对以前的一个我的问题的后续,在那里我提供了另一个相同类型的队列,以获得一些反馈。有些人指出了我犯的一些基本错误,我开始了解到,当我谈到填充变量时,我非常天真。这是一个更新的版本,使用我在前一个线程中得到的反馈创建。

新的队列为32位,其目的是便携和重量轻。

有界循环缓冲区&两个游标

我仍然使用一个有界的循环缓冲区来存储/加载数据。它仍然使用两个游标,向下一个生产者/使用者指示他们应该使用的缓冲区上的哪个索引。当生产者/使用者希望增加各自的游标时,两个游标同时加载,因为这两个游标都包含在对齐结构中。这是为了确保真正共享对游标的访问,因为它们从不单独加载。一旦加载了游标,就会在包含两个游标的对象上的CAS之前执行充分/空检查。如果其中一个游标在过渡期间发生了更改,CAS将失败,并再次尝试。当CAS成功时,可以将数据放入缓冲区或从缓冲区中删除。为了计算缓冲区上的索引,使用了一个索引掩码,它将比2的幂小一个,因为我们可以使用按位-而不是模组将索引包装回到零。要使队列大小工作,队列大小必须是2的幂,因此用户指定的大小将提升到下一个2的幂。

循环缓冲节点&自旋锁

虽然游标可能受到CAS操作的保护,但缓冲区上的每个节点都受到自旋锁的保护。这是为了防止消费者在生产者输入数据之前试图读取数据的情况。或者相反的情况,生产者试图在消费者完成阅读之前添加一些数据。

代码

这是完整的源代码,为了清晰起见,删除了许多DO2风格的注释。

代码语言:javascript
复制
// SPDX-License-Identifier: GPL-2.0-or-later
/**
 * C++14 32bit Lockless Bounded Circular MPMC Queue type.
 * Author: Primrose Taylor
 */

#ifndef BOUNDED_CIRCULAR_MPMC_QUEUE_H
#define BOUNDED_CIRCULAR_MPMC_QUEUE_H

#include "stdio.h"
#include "stdlib.h"

#include 
#include 
#include 
#include 

#define CACHE_LINE_SIZE     64U

#if defined(_MSC_VER)
    #define HARDWARE_PAUSE()                _mm_pause();
    #define _ENABLE_ATOMIC_ALIGNMENT_FIX    1 // MSVC atomic alignment fix.
    #define ATOMIC_ALIGNMENT                4
#else
    #define ATOMIC_ALIGNMENT                16
    #if defined(__clang__) || defined(__GNUC__)
        #define HARDWARE_PAUSE()            __builtin_ia32_pause();
    #endif
#endif

/**
 * Lockless, Multi-Producer, Multi-Consumer, Bounded Circular Queue type.
 * The type is intended to be light weight & portable.
 * The sub-types are all padded to fit within cache lines. Padding may be put
 * inbetween member variables if the variables are accessed seperatley.
 */
template 
class bounded_circular_mpmc_queue final
{
    /**
     * Simple, efficient spin-lock implementation.
     * A function that takes a void lambda function can be used to
     * conveiniently do something which will be protected by the lock.
     * @cite Credit to Erik Rigtorp https://rigtorp.se/spinlock/
     */
    class spin_lock
    {
        std::atomic lock_flag;
        
    public:
        spin_lock()
            : lock_flag{false}
        {
        }

        void do_work_through_lock(const std::function functor)
        {
            lock();
            functor();
            unlock();
        }
        
        void lock()
        {
            while (true)
            {
                if (!lock_flag.exchange(true, std::memory_order_acquire))
                {
                    break;
                }

                while (lock_flag.load(std::memory_order_relaxed))
                {
                    should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
                }
            }
        }

        void unlock()
        {
            lock_flag.store(false, std::memory_order_release);
        }
    };

    /**
     * Structure that holds the two cursors.
     * The cursors are held together because we'll only ever be accessing
     * them both at the same time.
     * We don't directly align the struct because we need to use it as an
     * atomic variable, so we must align the atomic variable instead.
     */
    struct cursor_data
    {
        uint_fast32_t producer_cursor;
        uint_fast32_t consumer_cursor;
        uint8_t padding_bytes[CACHE_LINE_SIZE -
            sizeof(uint_fast32_t) -
            sizeof(uint_fast32_t)
            % CACHE_LINE_SIZE];

        cursor_data(const uint_fast32_t in_producer_cursor = 0,
            const uint_fast32_t in_consumer_cursor = 0)
            : producer_cursor(in_producer_cursor),
            consumer_cursor(in_consumer_cursor),
            padding_bytes{0}
        {
        }
    };

    /**
     * Structure that represents each node in the circular buffer.
     * Access to the data is protected by a spin lock.
     * Contention on the spin lock should be minimal, as it's only there
     * to prevent the case where a producer/consumer may try work with an element before
     * someone else has finished working with it. The data and the spin lock are seperated by
     * padding to put them in differnet cache lines, since they are not accessed
     * together in the case mentioned previously. The problem with this is
     * that in low contention cases, they will be accessed together, and thus
     * should be in the same cache line. More testing is needed here.
     */
    struct buffer_node
    {
        T data;
        uint8_t padding_bytes_0[CACHE_LINE_SIZE -
            sizeof(T) % CACHE_LINE_SIZE];
        spin_lock spin_lock_;
        uint8_t padding_bytes_1[CACHE_LINE_SIZE -
            sizeof(spin_lock)
            % CACHE_LINE_SIZE];

        buffer_node()
            : spin_lock_(),
            padding_bytes_0{0},
            padding_bytes_1{0}
        {
        }

        void get_data(T& out_data) const
        {
            spin_lock_.do_work_through_lock([&]()
            {
                out_data = data;
            });
        }

        void set_data(const T& in_data)
        {
            spin_lock_.do_work_through_lock([&]()
            {
               data = in_data; 
            });
        }
    };

    /**
     * Strucutre that contains the index mask, and the circular buffer.
     * Both are accessed at the same time, so they are not seperated by padding.
     */
    struct alignas(CACHE_LINE_SIZE) circular_buffer_data
    {
        const uint_fast32_t index_mask;
        buffer_node* circular_buffer;
        uint8_t padding_bytes[CACHE_LINE_SIZE -
            sizeof(const uint_fast32_t) -
            sizeof(buffer_node*)
            % CACHE_LINE_SIZE];

        circular_buffer_data()
            : index_mask(get_next_power_of_two()),
            padding_bytes{0}
        {
            static_assert(queue_size > 0, "Can't have a queue size <= 0!");
            static_assert(queue_size <= 0xffffffffU,
                "Can't have a queue length above 32bits!");
            static_assert(
                std::is_copy_constructible_v     ||
                std::is_copy_assignable_v        ||
                std::is_move_assignable_v        ||
                std::is_move_constructible_v,
                "Can't use non-copyable, non-assignable, non-movable, or non-constructible type!"
        );

            /** Contigiously allocate the buffer.
              * The theory behind using calloc and not aligned_alloc
              * or equivelant, is that the memory should still be aligned,
              * since calloc will align by the type size, which in this case
              * is a multiple of the cache line size.
             */
            circular_buffer = (buffer_node*)calloc(
                index_mask + 1, sizeof(buffer_node));
        }

        ~circular_buffer_data()
        {
            if(circular_buffer != nullptr)
            {
                free(circular_buffer);
            }
        }
        
    private:
        /**
         * @cite https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
         */
        uint_least32_t get_next_power_of_two()
        {
            uint_least32_t v = queue_size;

            v--;
            v |= v >> 1;
            v |= v >> 2;
            v |= v >> 4;
            v |= v >> 8;
            v |= v >> 16;
            v++;
            
            return v;
        }
    };
    
public:
    bounded_circular_mpmc_queue()
        : cursor_data_(cursor_data{}),
        circular_buffer_data_()
    {
    }

    bool push(const T& in_data)
    {
        cursor_data current_cursor_data;

        // An infinite while-loop is used instead of a do-while, to avoid
        // the yield/pause happening before the CAS operation.
        while(true)
        {
            current_cursor_data = cursor_data_.load(std::memory_order_acquire);

            // Check if the buffer is full..
            if (current_cursor_data.producer_cursor + 1 == current_cursor_data.consumer_cursor)
            {
                return false;
            }

            // CAS operation used to make sure the cursors have not been incremented
            // by another producer/consumer before we got to this point, and to then increment
            // the cursor by 1 if it hasn't been changed.
            if (cursor_data_.compare_exchange_weak(current_cursor_data,
            {current_cursor_data.producer_cursor + 1,
                current_cursor_data.consumer_cursor},
            std::memory_order_release, std::memory_order_relaxed))
            {
                break;
            }

            should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
        }

        // Set the data
        circular_buffer_data_.circular_buffer[
            current_cursor_data.producer_cursor & circular_buffer_data_.index_mask
            ].set_data(in_data);
        
        return true;
    }

    bool pop(T& out_data)
    {
        cursor_data current_cursor_data;

        while(true)
        {
            current_cursor_data = cursor_data_.load(std::memory_order_acquire);

            // Check if the queue is empty.. 
            if (current_cursor_data.consumer_cursor == current_cursor_data.producer_cursor)
            {
                return false;
            }

            if (cursor_data_.compare_exchange_weak(current_cursor_data,
            {current_cursor_data.producer_cursor,
                current_cursor_data.consumer_cursor + 1},
                std::memory_order_release, std::memory_order_relaxed))
            {
                break;
            }
            
            should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
        }

        // Get the data
        circular_buffer_data_.circular_buffer[
            current_cursor_data.consumer_cursor & circular_buffer_data_.index_mask
            ].get_data(out_data);
        
        return true;
    }
    
    uint_fast32_t size() const
    {
        const cursor_data cursors = cursor_data_.load(std::memory_order_acquire);
        return cursors.producer_cursor - cursors.consumer_cursor;
    }

    bool empty() const
    {
        return size() == 0;
    }

    bool full() const
    {
        return size() == circular_buffer_data_.index_mask + 1;
    }
    
private:
    alignas(CACHE_LINE_SIZE) std::atomic cursor_data_;
    circular_buffer_data circular_buffer_data_;
    
private:
    bounded_circular_mpmc_queue(
        const bounded_circular_mpmc_queue&) = delete;
    bounded_circular_mpmc_queue& operator=(
        const bounded_circular_mpmc_queue&) = delete;
};

#endif

我想知道我的推送/流行方法是否像我想的那样起作用?有任何可能的ABA问题吗?而使用自旋锁来保护每个节点,它们是最好的方式吗?我之所以使用它,是因为理论上它不需要经常使用,就像在大多数情况下,没有其他人仍然在处理节点。

任何帮助都会得到极大的认可!干杯。

EN

回答 1

Code Review用户

回答已采纳

发布于 2022-02-01 20:59:16

票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/273599

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档