我正试图通过std::atomic<std::shared_ptr>>实现一个没有锁的包装器,以便对容器这样的重要对象进行操作。我在这两个主题中找到了一些相关的信息:
但这仍然不是我需要的。
举一个例子:
TEST_METHOD(FechAdd)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
container++;
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
container++;
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}此函数工作正常,因为增量后操作符使用内部fetch_add()原子操作。
另一方面:
TEST_METHOD(LoadStore)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
{
auto value = container.load();
value++;
container.store(value);
}
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
{
auto value = container.load();
value++;
container.store(value);
}
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}然而,如果我用.load()和.store()操作替换它,并在这两个操作之间进行增量,结果就不一样了。这是两个原子操作,因此不能在这些操作之间进行同步。
我的最终目标是通过std::atomic<std::shared_ptr>加载对象的实际状态,执行一些非const操作,并通过存储操作再次保存它。
TEST_METHOD(AtomicSharedPtr)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->emplace(5);
container.store(reader);
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->erase(5);
container.store(reader);
}
});
}我知道第二个线程也只有来自于shared_ptr,这只会导致数据竞争。上的原子和非const操作的这只会导致数据竞争。。
因此,任何关于如何实现将用于存储在std::atomic<std::shared_ptr>中的对象的非const操作的无锁包装器的提示。
发布于 2021-12-21 20:39:02
首先是一个侧面的人。std::atomic<std::shared_ptr<T>>提供对指针的原子访问,并且不为T提供任何同步。这是非常重要的一点。您的代码显示您正在尝试同步T,而不是指针,因此atomic没有执行您认为的操作。为了使用std::atomic<std::shared_ptr<T>>,必须将指向T视为const。
有两种方法可以以线程安全的方式处理任意数据的读-修改-写。显然,第一种方法是使用锁。这通常是更快的执行,并由于其简单,通常较少的错误,因此是高度建议。如果您真的想用原子操作来完成这个任务,这是很困难的,而且执行速度也比较慢。
它通常是这样的,您可以对指向的数据进行深度复制,修改副本,然后尝试用新的数据替换旧数据。如果有其他人在此期间更改了数据,那么您可以将其全部扔掉,然后重新开始整个突变。
template<class T, class F>
bool readModifyWrite(std::atomic<std::shared_ptr<T>>& container, F&& function) {
do {
const auto&& oldT = container.load();
//first a deep copy, to enforce immutability
auto&& newT = std::make_shared(oldT.get());
//then mutate the T
if (!function(*newT))
return false; //function aborted
//then attempt to save the modified T.
//if someone else changed the container during our modification, start over
} while(container.compare_exchange_strong(oldT, newT) == false);
//Note that this may take MANY tries to eventually succeed.
return true;
}然后,用法与您所使用的类似:
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
readModifyWrite(container, [](auto& reader) {
reader.emplace(5);
return true;
});
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
readModifyWrite(container, [](auto& reader) {
reader.erase(5);
return true;
});
}
});
}请注意,由于一个线程正在插入5 loopCount时间,而另一个线程正在擦除5 loopCount时间,但它们之间没有同步,所以第一个线程可能会在一行中写入几次(对于一个集合来说是无操作),然后第二个线程可能会在一行中擦除几次(对于一个集合来说是无操作),所以在这里您并不能真正保证最终结果,但我假设您知道这一点。
但是,如果您想要使用这些突变来同步,那就会变得非常复杂。如果变异函数成功或中止,则必须返回,然后readModifyWrite的调用方必须处理修改中止的情况。(请注意,readModifyWrite有效地从函数返回值,因此它从修改步骤返回值。写入步骤不影响返回值)
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_emplace = readModifyWrite(container, [](auto& reader) {
return reader.emplace(5);
});
if (did_emplace) i++;
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_erase = readModifyWrite(container, [](auto& reader) {
return reader.erase(5);
});
if (did_erase) i++;
}
});
}https://stackoverflow.com/questions/70440813
复制相似问题