我试图在代码中实现一个简单的屏障,如下所示:
void waitOnBarrier(int* barrier, int numberOfThreads) {
atomicIncrement(barrier); // atomic increment implemented in assembly
while(*barrier < numberOfThreads);
}然后在代码中有一个障碍用法:
int g_barrier = 0; // a global variable
waitOnBarrier(&g_barrier, someKnownNumberOfThreads);到目前为止还不错,但是我应该在哪里将g_barrier变量重新设置为零呢?如果我写这样的东西
g_barrier = 0;在waitOnBarrier调用之后,如果其中一个线程将比其他线程更快地从屏障中释放,并在所有其他线程仍在执行循环指令时使g_barrier无效,我将遇到一个问题,因此它们最终将永远停留在屏障上。
说明:waitOnBarrier将编译成这样的代码(伪代码):
1: mov rax, numberOfThreads
2: mov rbx, [barrier]
3: cmp rax, rbx
4: jmp(if smaller) to 2因此,如果在屏障上有两个线程同步,而thread_1在指令3或4处的某个位置慢,而速度更快的thread_2到达屏障,则传递它并继续到g_barrier零化流。这意味着,在thread_1到达指令2之后,它将在屏障处看到一个零值,并将永远停留在屏障上!
问题是,我应该如何取消g_barrier,它在代码中的位置“足够远”,我可以确定到那时所有线程都离开了屏障?还是有更正确的方法来实现障碍?
发布于 2014-10-07 11:28:10
障碍实际上是很难实施的,主要原因是新的服务员可以在所有老服务员都有机会执行之前就开始到达,这就排除了任何简单的基于计数的实现。我喜欢的解决方案是让屏障对象本身简单地指向到达屏障的第一个线程堆栈上存在的“当前屏障实例”,该实例也将是最后留下的线程(因为它不能离开,而其他线程仍在引用它的堆栈)。在Michael对我过去关于这个主题的问题的回答中包含了一个非常好的示例实现,它涉及到p线程原语(可以适应于C11锁定原语或您必须使用的任何东西):
https://stackoverflow.com/a/5902671/379897
是的,这看起来是很多工作,但是编写一个真正满足障碍契约的屏障实现并不是件简单的事。
发布于 2015-03-18 23:11:22
当我尝试做类似的事情时,我遇到了这个问题,所以我想分享我的解决方案,以防别人发现它有用。它是用纯C++11实现的(遗憾的是,它不是C11,因为标准的多线程部分在gcc和msvc中还不受支持)。
基本上,您需要维护两个计数器,它们的用法是交替的。下面是实现和使用示例:
#include <cstdio>
#include <thread>
#include <condition_variable>
// A barrier class; The barrier is initialized with the number of expected threads to synchronize
class barrier_t{
const size_t count;
size_t counter[2], * currCounter;
std::mutex mutex;
std::condition_variable cv;
public:
barrier_t(size_t count) : count(count), currCounter(&counter[0]) {
counter[0] = count;
counter[1] = 0;
}
void wait(){
std::unique_lock<std::mutex> lock(mutex);
if (!--*currCounter){
currCounter += currCounter == counter ? 1 : -1;
*currCounter = count;
cv.notify_all();
}
else {
size_t * currCounter_local = currCounter;
cv.wait(lock, [currCounter_local]{return *currCounter_local == 0; });
}
}
};
void testBarrier(size_t iters, size_t threadIdx, barrier_t *B){
for(size_t i = 0; i < iters; i++){
printf("Hello from thread %i for the %ith time!\n", threadIdx, i);
B->wait();
}
}
int main(void){
const size_t threadCnt = 4, iters = 8;
barrier_t B(threadCnt);
std::thread t[threadCnt];
for(size_t i = 0; i < threadCnt; i++) t[i] = std::thread(testBarrier, iters, i, &B);
for(size_t i = 0; i < threadCnt; i++) t[i].join();
return 0;
}发布于 2014-10-07 09:10:10
尝试实现本书中解释的屏障解决方案:
小信号灯书
https://stackoverflow.com/questions/26231727
复制相似问题