首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有效地保护关键部分,这些部分大多是读的,很少被写入。

有效地保护关键部分,这些部分大多是读的,很少被写入。
EN

Stack Overflow用户
提问于 2013-12-30 02:07:37
回答 4查看 395关注 0票数 3

我有一个多线程应用程序。

它包含了几个全局变量,大部分时间由几个不同的线程读取,很少由其他线程编写。

我可以用互斥保护他们,但在这种情况下,这是相当“昂贵”的。

在大多数情况下,读取器会“互相锁定”,如果需要优先级反转,调度程序会进行干预,等等。所有这些都没有任何真正的需要,因为单独阅读不应该有任何害处。

因此,我正在寻找其他类型的操作系统资源,它将帮助我以更低的成本实现保护这些全局变量的目标。

我想到的第一件事是数信号,但我不知道在这种情况下如何正确地使用它们,或者它们是否支持我的想法.以下是我的想法:

  1. 一个名为Sem的共享对象,它支持以下原子操作:
    • 如果计数为>= 0,则为增量计数;否则,锁定调用线程
    • 如果计数> 0,则减少计数,如果计数为== 0,则向所有挂起的线程发出信号。
    • Get:如果count == 0,设置count = -1;否则,锁定调用线程
    • 如果计数为== -1,设置count =0并向所有挂起的线程发出信号

  1. 读者:
    • Sem公司
    • 读取变量
    • 扫描电镜测试

  1. 作者:
    • 扫描电镜获取
    • 写入变量
    • 扫描电镜放置

当然,原子操作不是“免费的”,因为它们需要禁用中断。尽管如此,它似乎比使用互斥要有效得多,还有所有不必要的附加上下文切换,它们很可能伴随着互斥。

我的问题如下:

  1. 我对这个问题的解决方案正确吗?
    • 如果是,我应该为'Sem‘使用什么类型的OS资源?
    • 如果没有,陷阱在哪里?

  1. 这是一个已知的标准解决方案的问题吗?

该应用程序运行在ThreadX内核上,但任何与操作系统无关的答案都将受到高度赞赏。

EN

回答 4

Stack Overflow用户

发布于 2013-12-30 02:45:50

您可以使用System信号量实现共享锁&独占(读取器/写入器)锁,值为0的struct中的sem_op将等待零,下面是一个简单的示例:

如果有N项,则创建N*2信号量,每个项都有两个信号量,表示共享锁和独占锁:

若要共享锁定项,请执行以下操作:

  1. semop(0)在其独占锁上。
  2. 共享锁上的信号(1)。

要共享解锁项,请执行以下操作:

  1. 共享锁上的信号(-1)。

要独占锁定一个项目:

  1. 共享锁上的信号(0)。
  2. 独占锁上的信号(0)。
  3. 在排他锁上的信号(1)。

要独占解锁一个项目:

  1. 独占锁上的信号(-1)。

在semtimedop()中执行的一组操作是原子的。

票数 3
EN

Stack Overflow用户

发布于 2014-02-20 10:03:58

如果您的更新确实不频繁,并且您的读取访问非常多,以至于互斥会导致争用,那么您可以使用一种极简的事务性方法。

这假设您不需要闪电般的快速响应( 99.9%的应用程序在实现共享访问时遇到这种情况,因为简单的客户机-服务器数据流将足够且效率更高)。

基本理念

每个读取器保持参考数据的本地副本。

当作者想要执行更新时,它会在修改引用数据之后增加版本号。

读者只需检查参考版本号,才能访问本地副本。

如果数字匹配,他们可以自由地进行。

如果数字不匹配,他们必须获得一份新的副本。

朴素实现

这里是一个简单的实现。您可以通过使用读取器/写入器锁、条件变量或其他更智能的机制来替换互斥锁,从而提高效率,但由于重点是支持对未经修改的数据进行读取,因此可能没有必要进行优化。

代码语言:javascript
复制
#include <vector>
#include <thread>   
#include <atomic>
#include <mutex>

using namespace std;

// ================================
// minimalistic transaction wrapper
// ================================
template <class T>
class shared_read; // forward declaration

// --------------------------------
// transaction writer
// --------------------------------
template <class T>
class shared_write {
    friend class shared_read<T>;
    atomic<int> version;
    mutex             lock;
    T&                data;

public:
    shared_write(T& obj) : data(obj), version(0) {}

    void update (T& new_version)
    {
        unique_lock<mutex> waiter(lock);
        data = new_version;
        version++;
    }
};

// --------------------------------
// transaction reader
// --------------------------------
template <class T>
class shared_read {
    int        version;
    shared_write<T>& reference;
    T                payload;

public:
    // take an initial copy on construction
    shared_read(shared_write<T>& ref) : reference(ref), payload(ref.data), version(ref.version) {}

    // get an up to date copy
    const T& get(void)
    {
        // update to latest version
        while (version != reference.version)
        {
            unique_lock<mutex> waiter(reference.lock);
            payload = reference.data;
            version = reference.version;
        }

        // return local copy
        return payload;
    }
};

用法

为此,为目标全局对象创建一个全局写包装器,并让每个线程从写包装器中创建一个读包装器:

代码语言:javascript
复制
// at toplevel
Something global;
shared_write<Something> writer;

// inside each reader thread
void reader_worker (void)
{
    shared_read<Something> reader(writer);

    ...

    const Something& my_something_copy = reader.get();
}

// inside each writer thread
void writer_worker (void)
{
    Something my_new_version_of_something;
    my_new_version_of_something.initialize (whatever);
    writer.update (my_new_version_of_something);
}

警告

当生成新版本时,所有读者都会排队检索新版本,这将带来相当大的成本。

这种方法在很大程度上支持成功的读取(成功的读取具有可忽略的开销,而更新的成本较高),并且响应性稍差(与使用序列化访问时相比,读者可以使用过时的副本稍微长一点)。

测试

下面是一个快速而肮脏的测试工具:

代码语言:javascript
复制
#include <vector>
#include <thread>
#include <ctime>

#include <atomic>
#include <mutex>

#include <iostream>

using namespace std;

// --------------------------------
// measurement
// --------------------------------
int num_refresh;

// ================================
// minimalistic transaction wrapper
// ================================
template <class T>
class shared_read;

// --------------------------------
// transaction writer
// --------------------------------
template <class T>
class shared_write {
    friend class shared_read<T>;
    atomic<int> version;
    mutex             lock;
    T&                data;

public:
    shared_write(T& obj) : data(obj), version(0) {}

    void update (T& new_version)
    {
        unique_lock<mutex> waiter(lock);
        data = new_version;
        version++;
    }
};

// --------------------------------
// transaction reader
// --------------------------------
template <class T>
class shared_read {
    int        version;
    shared_write<T>& reference;
    T                payload;

public:
    // take an initial copy on construction
    shared_read(shared_write<T>& ref) : reference(ref), payload(ref.data), version(ref.version) {}

    // get an up to date copy
    const T& get(void)
    {
        // update to latest version
        while (version != reference.version)
        {
            unique_lock<mutex> waiter(reference.lock);
            payload = reference.data;
            version = reference.version;
            num_refresh++;
        }

        // return local copy
        return payload;
    }

    // debug: get object version
    int get_version(void) { return version; }
};

// ================================
// test
// ================================

// --------------------------------
// parameters
// --------------------------------
double write_frequency;
const int num_threads = 4;
const int num_accesses = 5000000/num_threads;

int mismatches;
atomic<int> global_ref_1 = 0;
atomic<int> global_ref_2 = 0;

// --------------------------------
// some global object
// --------------------------------
struct dummy {
    int ref;
    dummy(int ref = 0) : ref(ref) {}
};

dummy global;

// --------------------------------
// transactional version
// --------------------------------

// writer wraps the global
shared_write<dummy> writer(global);

void worker1(void)
{
    // local reader
    shared_read<dummy> reader(writer);

    for (int i = 0; i != num_accesses; i++)
    {
        if ((rand() / (double)RAND_MAX) < write_frequency)
        {
            writer.update(dummy(++global_ref_1));
        }
        else
        {
            const dummy& val = reader.get();
            if (val.ref != reader.get_version()) mismatches++;
        }
    }
}

// --------------------------------
// basic mutex version
// --------------------------------

mutex write_sync;
void worker2(void)
{
    for (int i = 0; i != num_accesses; i++)
    {
        if ((rand() / (double)RAND_MAX) < write_frequency)
        {
            unique_lock<mutex> waiter(write_sync);
            global.ref = ++global_ref_2;
        }
        else
        {
            unique_lock<mutex> waiter(write_sync);
            if (global.ref != global_ref_2) mismatches++;
        }
    }
}

// --------------------------------
// test harness
// --------------------------------
int main(void)
{
    vector<thread> threads(num_threads);

    for (write_frequency = .9; write_frequency > .0005; write_frequency /= 2)
    {
        time_t start;

        // test transactional version
        num_refresh = 0;
        mismatches = 0;
        start = clock();
        for (int i = 0; i != num_threads; i++) threads[i] = thread(worker1);
        for (thread& t : threads) t.join();
        int duration1 = (int)((clock() - start) / (double)CLOCKS_PER_SEC * 1000);
        if (mismatches != 0) printf("mismatches 1 : %d\n", mismatches);

        // test mutex version
        mismatches = 0;
        start = clock();
        for (int i = 0; i != num_threads; i++) threads[i] = thread(worker2);
        for (thread& t : threads) t.join();
        int duration2 = (int)((clock() - start) / (double)CLOCKS_PER_SEC * 1000);
        if (mismatches != 0) printf("mismatches 2 : %d\n", mismatches);

        double refresh_per_access = (double)num_refresh/num_accesses/num_threads;
        printf("frequency %.3f refreshes %.3f (%2.1f) duration %d / %d ms\n", write_frequency, refresh_per_access, refresh_per_access/write_frequency, duration1, duration2);
    }
}

它是在调试模式下使用Visual 2013编译的(为了故意放慢速度),它在我的4个核心CPU上产生以下输出:

具有4个线程和5.000.000次访问:

代码语言:javascript
复制
C:\Dev\PHP\_StackOverflow\C++\TransactionalGlobal\Debug>TransactionalGlobal.exe
mismatches 1 : 754
frequency 0.900 refreshes 0.092 (0.1) duration 12956 / 7747 ms
mismatches 1 : 21947
frequency 0.450 refreshes 0.297 (0.7) duration 8878 / 7786 ms
mismatches 1 : 64116
frequency 0.225 refreshes 0.324 (1.4) duration 2535 / 7682 ms
mismatches 1 : 56733
frequency 0.113 refreshes 0.240 (2.1) duration 1606 / 8119 ms
mismatches 1 : 39681
frequency 0.056 refreshes 0.156 (2.8) duration 1109 / 5157 ms
mismatches 1 : 20363
frequency 0.028 refreshes 0.090 (3.2) duration 853 / 6982 ms
mismatches 1 : 7923
frequency 0.014 refreshes 0.049 (3.5) duration 685 / 6849 ms
mismatches 1 : 4982
frequency 0.007 refreshes 0.026 (3.7) duration 580 / 8658 ms
mismatches 1 : 1180
frequency 0.004 refreshes 0.013 (3.8) duration 543 / 6492 ms
mismatches 1 : 1378
frequency 0.002 refreshes 0.007 (3.9) duration 534 / 7491 ms
mismatches 1 : 1058
mismatches 2 : 1920
frequency 0.001 refreshes 0.003 (3.9) duration 519 / 7684 ms

您可以看到线程处理的基本成本约为520 ms。同步的额外成本随着更新/读取百分比的增加而增加,与一个简单的互斥量接近50%左右,然后变得更糟。

“不匹配”是对线程何时处理过时数据的粗略度量(即,自从该线程获得本地副本以来,其他人已经更新了数据)。

有8个线程和350.000次访问(大约少了15倍!):

代码语言:javascript
复制
C:\Dev\PHP\_StackOverflow\C++\TransactionalGlobal\Debug>TransactionalGlobal.exe
mismatches 1 : 6
frequency 0.900 refreshes 0.091 (0.1) duration 8435 / 9440 ms
mismatches 1 : 55
frequency 0.450 refreshes 0.252 (0.6) duration 7443 / 9286 ms
mismatches 1 : 224
frequency 0.225 refreshes 0.183 (0.8) duration 4270 / 7691 ms
mismatches 1 : 1032
frequency 0.113 refreshes 0.140 (1.2) duration 2061 / 7522 ms
mismatches 1 : 1202
frequency 0.056 refreshes 0.104 (1.8) duration 670 / 10036 ms
mismatches 1 : 572
frequency 0.028 refreshes 0.046 (1.6) duration 480 / 7242 ms
mismatches 1 : 834
frequency 0.014 refreshes 0.038 (2.7) duration 130 / 3521 ms
mismatches 1 : 247
frequency 0.007 refreshes 0.025 (3.6) duration 40 / 10303 ms
frequency 0.004 refreshes 0.013 (3.8) duration 40 / 10375 ms
frequency 0.002 refreshes 0.007 (4.1) duration 40 / 10234 ms
mismatches 2 : 2421
frequency 0.001 refreshes 0.003 (4.0) duration 40 / 8372 ms

毫不奇怪,拥有比可用内核更多的线程会导致性能急剧下降,这是由于每个核心上的互斥争用造成的。另一方面,处理过时数据的情况也有所减少:)。

非修改的读取情况不受互斥争用的影响,因此基本执行时间从大约520减少到40 (13倍),大约是事务总数的15倍。

在1%的修改/读取下,更新的成本仍然是可以忽略的。

票数 1
EN

Stack Overflow用户

发布于 2014-01-25 10:55:27

首先,我会试图弄清楚您的平台是否有用于锁定读/写的指令,很多当代的平台都有。至于在threadx上可用的任何同步原语,这些元素中的每一个都将禁用中断、重新调度等等,所以如果您的HW缺乏对上面的支持,并且您感到threadx互斥量是多少(多少?)“很贵”,关掉中断就行了。如果在线程循环中将全局变量读入局部变量一次,这是无害的,但在这种情况下,我会使用互斥。在threadx,它真的很便宜。

Linux读/写锁也将禁用抢占(中断)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/20832367

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档