首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >具有两个线程的c++11无锁队列

具有两个线程的c++11无锁队列
EN

Stack Overflow用户
提问于 2020-04-17 10:57:06
回答 2查看 1K关注 0票数 0

除了主线程之外,我还有一个线程接收数据,以便将它们写入文件中。

代码语言:javascript
复制
std::queue<std::vector<int>> dataQueue;
std::mutex mutex;

void setData(const std::vector<int>& data) {
    std::lock_guard<std::mutex> lock(mutex);
    dataQueue.push(data);
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store) {
        std::lock_guard<std::mutex> lock(mutex);

        while (!dataQueue.empty()) {
            std::vector<int>& data= dataQueue.front();

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}

主线程使用setData,而write实际上是写线程。我使用std::lock_quard来避免内存冲突,但是当锁定写入线程时,它会减缓主线程的速度,因为它必须等待队列被解锁。但我想我可以避免这种情况,因为线程从来不会同时对队列的相同元素进行操作。

因此,我想做它无锁,但我真的不明白我应该如何实现。我是说,我怎么能不锁任何东西就这么做呢?此外,如果写入线程比主线程快,队列可能大部分时间都是空的,因此它应该以某种方式等待新数据,而不是无限循环以检查非空队列。

编辑:我通过std::cond_variable更改了简单的std::lock_guard,以便在队列为空时等待。但是主线程仍然可以被阻塞,因为当cvQeue.wait(.)被解析时,它会重新获得锁。此外,如果主线程执行cvQueue.notify_one(),但写入线程没有等待,怎么办?

代码语言:javascript
复制
std::queue<std::vector<int>> dataQueue;
std::mutex mutex;
std::condition_variable cvQueue;

void setData(const std::vector<int>& data) {
    std::unique_lock<std::mutex> lock(mutex);
    dataQueue.push(data);
    cvQueue.notify_one();
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store) {
        std::lock_guard<std::mutex> lock(mutex);

        while (!dataQueue.empty()) {
            std::unique_lock<std::mutex> lock(mutex);
            cvQueue.wait(lock);

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}
EN

回答 2

Stack Overflow用户

发布于 2020-04-18 14:21:33

如果您只有两个线程,那么您可以使用一个没有锁的单生产者-单一消费者(SPSC)队列。

在这里可以找到一个有界的版本:https://github.com/rigtor/SPSCQueue

Dmitry在这里提供了一个无界版本:http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue (您应该注意,这段代码应该适合使用atomics)。

关于阻塞pop操作--这是没有锁的数据结构所不能提供的,因为这样的操作显然不是无锁的。但是,如果队列在推送前为空,则应该以这样的方式调整链接实现,即推送操作通知条件变量。

票数 2
EN

Stack Overflow用户

发布于 2020-04-18 23:20:01

我想我有些东西能满足我的需要。我做了一个使用LockFreeQueuestd::atomic。因此,我可以原子地管理队列的头/尾状态。

代码语言:javascript
复制
template<typename T>
class LockFreeQueue {
public:
    void push(const T& newElement) {
        fifo.push(newElement);
        tail.fetch_add(1);
        cvQueue.notify_one();
    }

    void pop() {
        size_t oldTail = tail.load();
        size_t oldHead = head.load();

        if (oldTail == oldHead) {
            return;
        }

        fifo.pop();
        head.store(++oldHead);
    }

    bool isEmpty() {
        return head.load() == tail.load();
    }

    T& getFront() {
        return fifo.front();
    }

    void waitForNewElements() {
        if (tail.load() == head.load()) {
            std::mutex m;
            std::unique_lock<std::mutex> lock(m);
            cvQueue.wait_for(lock, std::chrono::milliseconds(TIMEOUT_VALUE));
        }
    }

private:
    std::queue<T> fifo;
    std::atomic<size_t> head = { 0 };
    std::atomic<size_t> tail = { 0 };
    std::condition_variable cvQueue;
};

LockFreeQueue<std::vector<int>> dataQueue;
std::atomic<bool> store(true);

void setData(const std::vector<int>& data) {
    dataQueue.push(data);
    // do other things
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store.load()) {

        dataQueue.waitForNewElements();

        while (!dataQueue.isEmpty()) {
            std::vector<int>& data= dataQueue.getFront();

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}

我在waitForNewElements中仍然有一个锁,但它并没有锁定整个进程,因为它正在等待事情的完成。但最大的改善是,生产者可以推动,而消费者流行。只有当LockFreQueue::tailLockFreeQueue::head相同时,才禁止使用。这意味着队列是空的,并进入等待状态。

我不太满意的是cvQueue.wait_for(lock, TIMEOUT_VALUE)。我想做一个简单的cvQueue.wait(lock),但问题是,当涉及到结束线程时,我在主线程中执行store.store(false)。因此,如果写线程正在等待,它将永远不会在没有超时的情况下结束。因此,我设置了足够大的超时,以便大多数情况下condition_variable由锁解析,而当线程结束时,则通过超时来解析。

如果你觉得一定有问题或必须改进,可以随意评论。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61269957

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档