在下面的场景中,最不需要的内存屏障是什么?
几个线程并行地更新数组int a[n]的元素。所有元素最初都设置为零。每个线程为每个元素计算一个新值;然后,它将计算出来的新值与存储在数组中的现有值进行比较,并且只有当新值大于存储值时才写入新值。例如,如果一个线程为a[0]计算一个新的值5,但是a[0]已经是10,那么线程不应该更新a[0]。但是,如果线程计算一个新的值10,而a[0]是5,那么线程必须更新a[0]。
新值的计算涉及一些共享只读数据;它根本不涉及数组。
当上面提到的线程正在运行时,没有其他线程访问数组.在保证所有线程完成更新之后,该数组将在以后使用。
该实现使用比较和交换循环,将元素包装到atomic_ref (来自Boost或C++20):
for (int k = 0; k != n; ++k) // For each element of the array
{
// Locally in this thread, compute the new value for a[k].
int new_value = ComputeTheNewValue(k);
// Establish atomic access to a[k].
atomic_ref<int> memory(a[k]);
// [Barrier_0]: Read the existing value.
int existing_value = memory.load(memory_order_relaxed);
while (true) // The compare-and-swap loop.
{
// Overwrite only with higher values.
if (new_value <= existing_value)
break;
// Need to update a[k] with the higher value "new_value", but
// only if a[k] still stores the "existing_value".
if (memory.compare_exchange_weak(existing_value, new_value,
/*Barrier_1*/ memory_order_relaxed,
/*Barrier_2*/ memory_order_relaxed))
{
// a[k] was still storing "existing_value", and it has been
// successfully updated with the higher "new_value".
// We're done, and we may exit the compare-and-swap loop.
break;
}
else
{
// We get here in two cases:
// 1. a[k] was found to store a value different from "existing_value", or
// 2. the compare-and-swap operation has failed spuriously.
// In the first case, the new value stored in a[k] has been loaded
// by compare_exchange_weak() function into the "existing_value" variable.
// Then, we need to compare the "new_value" produced by this thread
// with the newly loaded "existing_value". This is achieved by simply continuing the loop.
// The second case (the spurious failure) is also handled by continuing the loop,
// although in that case the "new_value <= existing_value" comparison is redundant.
continue;
}
}
}此代码涉及三个内存屏障:
Barrier_0在memory.load().Barrier_1,中使用,在读-修改-写时使用compare_exchange_weak() succeeds.Barrier_2,,在加载操作中使用compare_exchange_weak()失败。在这种情况下,当所有三个参数都设置为relaxed时,代码是否保证只使用更高的值进行更新?如果没有,需要什么最低限度的障碍来保证corrrect行为?
发布于 2022-02-08 20:48:40
放松是好的,你不需要任何订购wrt。在更新过程中访问任何其他元素。对于对同一位置的访问,C++保证每个位置分别存在“修改顺序”,即使relaxed操作也只能看到加载或RMWed之间位置的修改顺序相同或更高的值。
您只是从CAS重试循环中构建原子fetch_max原语。由于其他作者都在做同样的事情,所以每个位置的值都是单调增加的。因此,无论何时,只要看到一个大于new_value的值,就可以完全安全地退出。
对于主线程在最后收集结果,确实需要发布/获取同步,比如thread.join或某种标志。(例如,一个计数器的fetch_sub(1, release)表示还有多少个线程还有工作要做,或者一个done标志数组,这样你就可以做一个纯存储了。)
顺便说一句,这似乎很慢,很多时间都在等待高速缓存线在内核之间反弹。(很多错误的分享。)理想情况下,您可以有效地将其更改为让每个线程在数组的不同部分上工作(例如,为相同的索引计算多个候选项,这样它就不需要任何原子内容)。
I不能保证计算的索引不重叠。在实践中,这种重叠通常很小,但无法消除。
所以很明显这是个否定。如果不同线程所接触的索引位于不同的缓存行( 16 int32_t块),那么就不会有太多的错误共享。(而且,如果计算成本很高,所以生成值的速度不太快,这很好,所以原子更新不是代码花费的大部分时间。)
--但是如果是重大争用,并且数组不是很大,您可以为每个线程提供自己的输出数组,并在最后收集结果。例如,让一个线程为每个循环执行4到8个数组的a[i] = max(a[i], b[i], c[i], d[i])。(一次读取的流不多,输入的数量也不多,因为这可能无法有效编译)。这应该从SIMD中受益,例如SSE4.1 pmaxsd执行4个并行max操作,因此这应该主要受到L3缓存带宽的限制。
或者将最大工作作为第二个并行阶段在线程之间进行划分,每个线程在输出数组的部分上执行上述操作。或者让thread_id % 4 == 0减少来自自身和接下来的3个线程的结果,所以如果您有一个具有多个线程的系统,那么您就有了一棵缩减树。
https://stackoverflow.com/questions/71040243
复制相似问题