我对C非常陌生,所以我甚至不知道从哪里开始挖掘我的问题。我试图将python数字处理算法移植到C,而且由于C中没有GIL,所以我可以从线程中更改内存中任何我想要的东西,只要我确保没有种族。
我做了关于互斥的家庭作业,但是,我不能把我的头脑围绕着使用互斥,以防线程一次又一次地访问同一个数组。
我使用p_threads是为了在一个大数组a[N]上拆分工作负载。数组a[N]上的数字处理算法是加性的,所以我使用a_diff[N_THREADS][N]数组将其分割,将从每个线程应用到a_diff[N_THREADS][N]的更改写入a[N]数组,然后在每一步之后将它们合并在一起。
我需要在不同版本的数组a[N]上运行编译,所以我通过全局指针p传递它们(在MWE中,只有一个a[N])
我正在使用另一个全局数组SYNC_THREADS[N_THREADS]同步线程,并通过设置END_THREADS全局值来确保线程在需要时退出(我知道,我使用了太多的全局值--我不在乎,代码大约是200行)。我的问题是关于这种同步技术--这样做安全吗?有什么更干净/更好/更快的方法来实现这一点?
MWEe:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N_THREADS 3
#define N 10000000
#define STEPS 3
double a[N]; // main array
double a_diff[N_THREADS][N]; // diffs array
double params[N]; // parameter used for number-crunching
double (*p)[N]; // pointer to array[N]
// structure for bounds for crunching the array
struct bounds {
int lo;
int hi;
int thread_num;
};
struct bounds B[N_THREADS];
int SYNC_THREADS[N_THREADS]; // for syncing threads
int END_THREADS = 0; // signal to terminate threads
static void *crunching(void *arg) {
// multiple threads run number-crunching operations according to assigned low/high bounds
struct bounds *data = (struct bounds *)arg;
int lo = (*data).lo;
int hi = (*data).hi;
int thread_num = (*data).thread_num;
printf("worker %d started for bounds [%d %d] \n", thread_num, lo, hi);
int i;
while (END_THREADS != 1) { // END_THREADS tells threads to terminate
if (SYNC_THREADS[thread_num] == 1) { // SYNC_THREADS allows threads to start number-crunching
printf("worker %d working... \n", thread_num );
for (i = lo; i <= hi; ++i) {
a_diff[thread_num][i] += (*p)[i] * params[i]; // pretend this is an expensive operation...
}
SYNC_THREADS[thread_num] = 0; // thread disables itself until SYNC_THREADS is back to 1
printf("worker %d stopped... \n", thread_num );
}
}
return 0;
}
int i, j, th,s;
double joiner;
int main() {
// pre-fill arrays
for (i = 0; i < N; ++i) {
a[i] = i + 0.5;
params[i] = 0.0;
}
// split workload between workers
int worker_length = N / N_THREADS;
for (i = 0; i < N_THREADS; ++i) {
B[i].thread_num = i;
B[i].lo = i * worker_length;
if (i == N_THREADS - 1) {
B[i].hi = N;
} else {
B[i].hi = i * worker_length + worker_length - 1;
}
}
// pointer to parameters to be passed to worker
struct bounds **data = malloc(N_THREADS * sizeof(struct bounds*));
for (i = 0; i < N_THREADS; i++) {
data[i] = malloc(sizeof(struct bounds));
data[i]->lo = B[i].lo;
data[i]->hi = B[i].hi;
data[i]->thread_num = B[i].thread_num;
}
// create thread objects
pthread_t threads[N_THREADS];
// disallow threads to crunch numbers
for (th = 0; th < N_THREADS; ++th) {
SYNC_THREADS[th] = 0;
}
// launch workers
for(th = 0; th < N_THREADS; th++) {
pthread_create(&threads[th], NULL, crunching, data[th]);
}
// big loop of iterations
for (s = 0; s < STEPS; ++s) {
for (i = 0; i < N; ++i) {
params[i] += 1.0; // adjust parameters
// zero diff array
for (i = 0; i < N; ++i) {
for (th = 0; th < N_THREADS; ++th) {
a_diff[th][i] = 0.0;
}
}
p = &a; // pointer to array a
// allow threads to process numbers and wait for threads to complete
for (th = 0; th < N_THREADS; ++th) { SYNC_THREADS[th] = 1; }
// ...here threads started by pthread_create do calculations...
for (th = 0; th < N_THREADS; th++) { while (SYNC_THREADS[th] != 0) {} }
// join results from threads (number-crunching is additive)
for (i = 0; i < N; ++i) {
joiner = 0.0;
for (th = 0; th < N_THREADS; ++th) {
joiner += a_diff[th][i];
}
a[i] += joiner;
}
}
}
// join workers
END_THREADS = 1;
for(th = 0; th < N_THREADS; th++) {
pthread_join(threads[th], NULL);
}
return 0;
}我发现工人不会按时间重叠:
worker 0 started for bounds [0 3333332]
worker 1 started for bounds [3333333 6666665]
worker 2 started for bounds [6666666 10000000]
worker 0 working...
worker 1 working...
worker 2 working...
worker 2 stopped...
worker 0 stopped...
worker 1 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 0 stopped...
worker 2 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 2 stopped...
worker 0 stopped...
Process returned 0 (0x0) execution time : 1.505 s我确保工人的工作空间不会通过a_diff[thead_num][N]子数组将它们分开,但是我不确定情况总是如此,而且我不会在某个地方引入隐藏的种族.
发布于 2018-05-10 01:01:08
我不知道问题是什么:-)
因此,问题在于您是否对SYNC_THREADS和END_THREADS同步机制做了很好的思考。
是的!几乎.问题是线程在等待时正在消耗CPU。
条件变量
若要使线程等待事件,请使用条件变量(pthread_cond)。这些功能提供了一些有用的功能,如wait()、signal()和broadcast()。
wait(&cond, &m)在给定的条件变量中阻塞线程。注2signal(&cond)解锁在给定条件变量中等待的线程。broadcast(&cond)解锁在给定条件变量中等待的所有线程。最初,所有线程都在等待注意事项1
while(!start_threads)
pthread_cond_wait(&cond_start);当主线程准备就绪时:
start_threads = 1;
pthread_cond_broadcast(&cond_start);障碍
如果迭代之间存在数据依赖关系,则需要确保线程在任何给定时刻都在执行相同的迭代。
要在每次迭代结束时同步线程,您需要看看屏障(pthread_barrier):
pthread_barrier_init(count):初始化一个屏障来同步count线程。pthread_barrier_wait():线程在这里等待,直到所有count线程到达屏障。扩展障碍的功能
有时,您可能希望最后一个线程到达一个屏障来计算一些东西(例如,增加迭代次数的计数器,或者计算一些全局值,或者检查是否应该停止执行)。你有两个选择
使用pthread_barriers
从本质上讲,你需要有两个障碍:
int rc = pthread_barrier_wait(&b);
if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
if(shouldStop()) stop = 1;
pthread_barrier_wait(&b);
if(stop) return;使用pthread_cond实现我们自己的专用屏障
pthread_mutex_lock(&mutex)
remainingThreads--;
// all threads execute this
executedByAllThreads();
if(remainingThreads == 0) {
// reinitialize barrier
remainingThreads = N;
// only last thread executes this
if(shouldStop()) stop = 1;
pthread_cond_broadcast(&cond);
} else {
while(remainingThreads > 0)
pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);注1:为什么pthread_cond_wait()在while块中?可能有点奇怪。其背后的原因是由于虚假唤醒的存在。即使没有发出signal()或broadcast(),该函数也可能返回。因此,为了保证正确性,通常会有一个额外的变量来保证,如果一个线程在应该的时候突然醒来,它就会返回到pthread_cond_wait()中。
从手册中:
在使用条件变量时,总是有一个包含与每个条件等待关联的共享变量的布尔谓词,如果线程继续执行,则为真。可能会发生来自
pthread_cond_timedwait()或pthread_cond_wait()函数的虚假唤醒。由于来自pthread_cond_timedwait()或pthread_cond_wait()的返回并不意味着任何关于该谓词值的内容,因此在返回时应该重新计算谓词。 (...) 如果一个信号被发送到等待条件变量的线程,则当从信号处理程序返回时,该线程将恢复等待条件变量,就好像它没有被中断一样,或者由于虚假的唤醒而返回零。
注2:
Michael在注释中指出,无论何时修改谓词(start_threads)和pthread_cond_wait(),都应该保持一个伴随锁。pthread_cond_wait()将在调用时释放互斥对象,并在返回时重新获取它。
PS:这里有点晚了,对不起,如果我的文字混乱了:-)
https://stackoverflow.com/questions/50262651
复制相似问题