我有一个多线程应用程序。
它包含了几个全局变量,大部分时间由几个不同的线程读取,很少由其他线程编写。
我可以用互斥保护他们,但在这种情况下,这是相当“昂贵”的。
在大多数情况下,读取器会“互相锁定”,如果需要优先级反转,调度程序会进行干预,等等。所有这些都没有任何真正的需要,因为单独阅读不应该有任何害处。
因此,我正在寻找其他类型的操作系统资源,它将帮助我以更低的成本实现保护这些全局变量的目标。
我想到的第一件事是数信号,但我不知道在这种情况下如何正确地使用它们,或者它们是否支持我的想法.以下是我的想法:
当然,原子操作不是“免费的”,因为它们需要禁用中断。尽管如此,它似乎比使用互斥要有效得多,还有所有不必要的附加上下文切换,它们很可能伴随着互斥。
我的问题如下:
该应用程序运行在ThreadX内核上,但任何与操作系统无关的答案都将受到高度赞赏。
发布于 2013-12-30 02:45:50
您可以使用System信号量实现共享锁&独占(读取器/写入器)锁,值为0的struct中的sem_op将等待零,下面是一个简单的示例:
如果有N项,则创建N*2信号量,每个项都有两个信号量,表示共享锁和独占锁:
若要共享锁定项,请执行以下操作:
要共享解锁项,请执行以下操作:
要独占锁定一个项目:
要独占解锁一个项目:
在semtimedop()中执行的一组操作是原子的。
发布于 2014-02-20 10:03:58
如果您的更新确实不频繁,并且您的读取访问非常多,以至于互斥会导致争用,那么您可以使用一种极简的事务性方法。
这假设您不需要闪电般的快速响应( 99.9%的应用程序在实现共享访问时遇到这种情况,因为简单的客户机-服务器数据流将足够且效率更高)。
基本理念
每个读取器保持参考数据的本地副本。
当作者想要执行更新时,它会在修改引用数据之后增加版本号。
读者只需检查参考版本号,才能访问本地副本。
如果数字匹配,他们可以自由地进行。
如果数字不匹配,他们必须获得一份新的副本。
朴素实现
这里是一个简单的实现。您可以通过使用读取器/写入器锁、条件变量或其他更智能的机制来替换互斥锁,从而提高效率,但由于重点是支持对未经修改的数据进行读取,因此可能没有必要进行优化。
#include <vector>
#include <thread>
#include <atomic>
#include <mutex>
using namespace std;
// ================================
// minimalistic transaction wrapper
// ================================
template <class T>
class shared_read; // forward declaration
// --------------------------------
// transaction writer
// --------------------------------
template <class T>
class shared_write {
friend class shared_read<T>;
atomic<int> version;
mutex lock;
T& data;
public:
shared_write(T& obj) : data(obj), version(0) {}
void update (T& new_version)
{
unique_lock<mutex> waiter(lock);
data = new_version;
version++;
}
};
// --------------------------------
// transaction reader
// --------------------------------
template <class T>
class shared_read {
int version;
shared_write<T>& reference;
T payload;
public:
// take an initial copy on construction
shared_read(shared_write<T>& ref) : reference(ref), payload(ref.data), version(ref.version) {}
// get an up to date copy
const T& get(void)
{
// update to latest version
while (version != reference.version)
{
unique_lock<mutex> waiter(reference.lock);
payload = reference.data;
version = reference.version;
}
// return local copy
return payload;
}
};用法
为此,为目标全局对象创建一个全局写包装器,并让每个线程从写包装器中创建一个读包装器:
// at toplevel
Something global;
shared_write<Something> writer;
// inside each reader thread
void reader_worker (void)
{
shared_read<Something> reader(writer);
...
const Something& my_something_copy = reader.get();
}
// inside each writer thread
void writer_worker (void)
{
Something my_new_version_of_something;
my_new_version_of_something.initialize (whatever);
writer.update (my_new_version_of_something);
}警告
当生成新版本时,所有读者都会排队检索新版本,这将带来相当大的成本。
这种方法在很大程度上支持成功的读取(成功的读取具有可忽略的开销,而更新的成本较高),并且响应性稍差(与使用序列化访问时相比,读者可以使用过时的副本稍微长一点)。
测试
下面是一个快速而肮脏的测试工具:
#include <vector>
#include <thread>
#include <ctime>
#include <atomic>
#include <mutex>
#include <iostream>
using namespace std;
// --------------------------------
// measurement
// --------------------------------
int num_refresh;
// ================================
// minimalistic transaction wrapper
// ================================
template <class T>
class shared_read;
// --------------------------------
// transaction writer
// --------------------------------
template <class T>
class shared_write {
friend class shared_read<T>;
atomic<int> version;
mutex lock;
T& data;
public:
shared_write(T& obj) : data(obj), version(0) {}
void update (T& new_version)
{
unique_lock<mutex> waiter(lock);
data = new_version;
version++;
}
};
// --------------------------------
// transaction reader
// --------------------------------
template <class T>
class shared_read {
int version;
shared_write<T>& reference;
T payload;
public:
// take an initial copy on construction
shared_read(shared_write<T>& ref) : reference(ref), payload(ref.data), version(ref.version) {}
// get an up to date copy
const T& get(void)
{
// update to latest version
while (version != reference.version)
{
unique_lock<mutex> waiter(reference.lock);
payload = reference.data;
version = reference.version;
num_refresh++;
}
// return local copy
return payload;
}
// debug: get object version
int get_version(void) { return version; }
};
// ================================
// test
// ================================
// --------------------------------
// parameters
// --------------------------------
double write_frequency;
const int num_threads = 4;
const int num_accesses = 5000000/num_threads;
int mismatches;
atomic<int> global_ref_1 = 0;
atomic<int> global_ref_2 = 0;
// --------------------------------
// some global object
// --------------------------------
struct dummy {
int ref;
dummy(int ref = 0) : ref(ref) {}
};
dummy global;
// --------------------------------
// transactional version
// --------------------------------
// writer wraps the global
shared_write<dummy> writer(global);
void worker1(void)
{
// local reader
shared_read<dummy> reader(writer);
for (int i = 0; i != num_accesses; i++)
{
if ((rand() / (double)RAND_MAX) < write_frequency)
{
writer.update(dummy(++global_ref_1));
}
else
{
const dummy& val = reader.get();
if (val.ref != reader.get_version()) mismatches++;
}
}
}
// --------------------------------
// basic mutex version
// --------------------------------
mutex write_sync;
void worker2(void)
{
for (int i = 0; i != num_accesses; i++)
{
if ((rand() / (double)RAND_MAX) < write_frequency)
{
unique_lock<mutex> waiter(write_sync);
global.ref = ++global_ref_2;
}
else
{
unique_lock<mutex> waiter(write_sync);
if (global.ref != global_ref_2) mismatches++;
}
}
}
// --------------------------------
// test harness
// --------------------------------
int main(void)
{
vector<thread> threads(num_threads);
for (write_frequency = .9; write_frequency > .0005; write_frequency /= 2)
{
time_t start;
// test transactional version
num_refresh = 0;
mismatches = 0;
start = clock();
for (int i = 0; i != num_threads; i++) threads[i] = thread(worker1);
for (thread& t : threads) t.join();
int duration1 = (int)((clock() - start) / (double)CLOCKS_PER_SEC * 1000);
if (mismatches != 0) printf("mismatches 1 : %d\n", mismatches);
// test mutex version
mismatches = 0;
start = clock();
for (int i = 0; i != num_threads; i++) threads[i] = thread(worker2);
for (thread& t : threads) t.join();
int duration2 = (int)((clock() - start) / (double)CLOCKS_PER_SEC * 1000);
if (mismatches != 0) printf("mismatches 2 : %d\n", mismatches);
double refresh_per_access = (double)num_refresh/num_accesses/num_threads;
printf("frequency %.3f refreshes %.3f (%2.1f) duration %d / %d ms\n", write_frequency, refresh_per_access, refresh_per_access/write_frequency, duration1, duration2);
}
}它是在调试模式下使用Visual 2013编译的(为了故意放慢速度),它在我的4个核心CPU上产生以下输出:
具有4个线程和5.000.000次访问:
C:\Dev\PHP\_StackOverflow\C++\TransactionalGlobal\Debug>TransactionalGlobal.exe
mismatches 1 : 754
frequency 0.900 refreshes 0.092 (0.1) duration 12956 / 7747 ms
mismatches 1 : 21947
frequency 0.450 refreshes 0.297 (0.7) duration 8878 / 7786 ms
mismatches 1 : 64116
frequency 0.225 refreshes 0.324 (1.4) duration 2535 / 7682 ms
mismatches 1 : 56733
frequency 0.113 refreshes 0.240 (2.1) duration 1606 / 8119 ms
mismatches 1 : 39681
frequency 0.056 refreshes 0.156 (2.8) duration 1109 / 5157 ms
mismatches 1 : 20363
frequency 0.028 refreshes 0.090 (3.2) duration 853 / 6982 ms
mismatches 1 : 7923
frequency 0.014 refreshes 0.049 (3.5) duration 685 / 6849 ms
mismatches 1 : 4982
frequency 0.007 refreshes 0.026 (3.7) duration 580 / 8658 ms
mismatches 1 : 1180
frequency 0.004 refreshes 0.013 (3.8) duration 543 / 6492 ms
mismatches 1 : 1378
frequency 0.002 refreshes 0.007 (3.9) duration 534 / 7491 ms
mismatches 1 : 1058
mismatches 2 : 1920
frequency 0.001 refreshes 0.003 (3.9) duration 519 / 7684 ms您可以看到线程处理的基本成本约为520 ms。同步的额外成本随着更新/读取百分比的增加而增加,与一个简单的互斥量接近50%左右,然后变得更糟。
“不匹配”是对线程何时处理过时数据的粗略度量(即,自从该线程获得本地副本以来,其他人已经更新了数据)。
有8个线程和350.000次访问(大约少了15倍!):
C:\Dev\PHP\_StackOverflow\C++\TransactionalGlobal\Debug>TransactionalGlobal.exe
mismatches 1 : 6
frequency 0.900 refreshes 0.091 (0.1) duration 8435 / 9440 ms
mismatches 1 : 55
frequency 0.450 refreshes 0.252 (0.6) duration 7443 / 9286 ms
mismatches 1 : 224
frequency 0.225 refreshes 0.183 (0.8) duration 4270 / 7691 ms
mismatches 1 : 1032
frequency 0.113 refreshes 0.140 (1.2) duration 2061 / 7522 ms
mismatches 1 : 1202
frequency 0.056 refreshes 0.104 (1.8) duration 670 / 10036 ms
mismatches 1 : 572
frequency 0.028 refreshes 0.046 (1.6) duration 480 / 7242 ms
mismatches 1 : 834
frequency 0.014 refreshes 0.038 (2.7) duration 130 / 3521 ms
mismatches 1 : 247
frequency 0.007 refreshes 0.025 (3.6) duration 40 / 10303 ms
frequency 0.004 refreshes 0.013 (3.8) duration 40 / 10375 ms
frequency 0.002 refreshes 0.007 (4.1) duration 40 / 10234 ms
mismatches 2 : 2421
frequency 0.001 refreshes 0.003 (4.0) duration 40 / 8372 ms毫不奇怪,拥有比可用内核更多的线程会导致性能急剧下降,这是由于每个核心上的互斥争用造成的。另一方面,处理过时数据的情况也有所减少:)。
非修改的读取情况不受互斥争用的影响,因此基本执行时间从大约520减少到40 (13倍),大约是事务总数的15倍。
在1%的修改/读取下,更新的成本仍然是可以忽略的。
发布于 2014-01-25 10:55:27
首先,我会试图弄清楚您的平台是否有用于锁定读/写的指令,很多当代的平台都有。至于在threadx上可用的任何同步原语,这些元素中的每一个都将禁用中断、重新调度等等,所以如果您的HW缺乏对上面的支持,并且您感到threadx互斥量是多少(多少?)“很贵”,关掉中断就行了。如果在线程循环中将全局变量读入局部变量一次,这是无害的,但在这种情况下,我会使用互斥。在threadx,它真的很便宜。
Linux读/写锁也将禁用抢占(中断)。
https://stackoverflow.com/questions/20832367
复制相似问题