首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何实现无锁,但阻塞行为?

如何实现无锁,但阻塞行为?
EN

Stack Overflow用户
提问于 2011-05-22 18:34:26
回答 6查看 6.1K关注 0票数 15

我正在为一个密集的网络应用程序实现一个无锁的单一生产者单消费者队列。我有一群工作线程在它们自己的单独队列中接收工作,然后将它们排掉队列并进行处理。

从这些队列中删除锁大大提高了在高负载下的性能,,但是当队列为空时,它们不再阻塞,这反过来导致CPU的使用量急剧上升。

如何有效地导致线程阻塞,直到线程成功地排出队列或被终止/中断?

EN

回答 6

Stack Overflow用户

回答已采纳

发布于 2011-05-22 18:44:00

如果您在Linux上,请考虑使用富特。它通过使用原子操作而不是像互斥锁那样使用内核调用来提供非锁定实现的性能,但是如果由于某些条件不正确(即锁争用)而需要将进程设置为空闲,那么它将发出适当的内核调用,使进程处于休眠状态,并在将来的事件中唤醒它。基本上就像一个非常快的信号量。

票数 16
EN

Stack Overflow用户

发布于 2011-05-22 18:45:01

在Linux上,可以使用富特阻止线程。但是请注意,福特克斯很狡猾

更新:与futexes相比,条件变量使用起来安全得多,而且更易于移植。但是,条件变量是与互斥体结合使用的,因此严格地说,结果将不再是无锁的。但是,如果您的主要目标是性能(而不是全局进度的保证),并且锁定部分(即线程唤醒后检查的条件)很小,则可能会发生这样的情况:您将获得满意的结果,而无需将futexes集成到算法中的微妙之处。

票数 9
EN

Stack Overflow用户

发布于 2011-05-23 01:40:05

如果您在Windows上,您将无法使用futexes,但Windows有一个类似的机制称为关键事件。不幸的是,这不是已发布的API的一部分(它是NTDLL本机API),但只要您接受Windows的未来版本中可能会更改的警告(并且您不需要在预Vista内核上运行),您就可以使用它。一定要阅读上面链接的那篇文章。下面是一个未测试的草图,说明它的工作原理:

代码语言:javascript
复制
/* Interlocked SList queue using keyed event signaling */

struct queue {
    SLIST_HEADER slist;
    // Note: Multiple queues can (and should) share a keyed event handle
    HANDLE keyed_event;
    // Initial value: 0
    // Prior to blocking, the queue_pop function increments this to 1, then
    // rechecks the queue. If it finds an item, it attempts to compxchg back to
    // 0; if this fails, then it's racing with a push, and has to block
    LONG block_flag;
};

void init_queue(queue *qPtr) {
    NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
    InitializeSListHead(&qPtr->slist);
    qPtr->blocking = 0;
}

void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
    InterlockedPushEntrySList(&qPtr->slist, entry);

    // Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
    // have committed to a keyed-event handshake
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
    if (oldv) {
        NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    }
}

SLIST_ENTRY *queue_pop(queue *qPtr) {
    SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry)
        return entry; // fast path

    // Transition block flag 0 -> 1. We must recheck the queue after this point
    // in case we race with queue_push; however since ReleaseKeyedEvent
    // blocks until it is matched up with a wait, we must perform the wait if
    // queue_push sees us
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);

    assert(oldv == 0);

    entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry) {
        // Try to abort
        oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
        if (oldv == 1)
            return entry; // nobody saw us, we can just exit with the value
    }

    // Either we don't have an entry, or we are forced to wait because
    // queue_push saw our block flag. So do the wait
    NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    // block_flag has been reset by queue_push

    if (!entry)
        entry = InterlockedPopEntrySList(&qPtr->slist);
    assert(entry);

    return entry;
}

您还可以使用类似的协议,使用细读写锁和条件变量,使用无锁的快速路径。这些是键控事件的包装器,因此它们可能比直接使用键控事件带来更多的开销。

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/6089917

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档