我有一个多线程程序,它的2个线程通过一个消息队列相互通信。第一个线程(发送者)定期发送消息,而第二个线程(接收者)处理信息。
发送者的代码类似于:
// Create queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664 | IPC_CREAT);
// Create message and send
struct request_msg req_msg;
req_msg.mtype = 1;
snprintf(req_msg.mtext, MSG_LENGTH, "Send this information");
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);在接收线程上,我这样做:
// Subscribe to queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664);
struct request_msg req_msg;
while(running)
{
msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);
// Do sth with the message
}正如您所看到的,接收器位于一个while循环中,该循环由一个名为"running“的全局变量控制。如果在进程中遇到错误,错误处理程序会将布尔值设置为false。这在大多数情况下都是有效的,但是如果在能够向队列发送消息之前发生错误,接收器将不会退出while循环,因为它在继续之前等待消息,从而检查正在运行的变量。这意味着它将永远挂在那里,因为发送方在运行时的剩余时间内不会发送任何内容。
我想避免这种情况,但我不知道如何让msgrcv知道它不能期待更多的消息。假设这是最简单的版本,我无法了解如果我杀死队列,msgrcv是如何运行的。也许超时或发送某种终止消息(可能使用消息结构的mtype成员)也是可能的。
请让我知道这个问题最可靠的解决方案是什么。谢谢!
编辑:根据建议,我重新编写了代码,使信号处理程序成为原子操作。
#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0
struct message
{
uint64_t iteration;
char req_time[28];
};
static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;
pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;
static void
termination_handler(int signum)
{
running = false;
}
static void
alarm_handler(int signum)
{
work = true;
}
static void
write_msg(void)
{
// Reset the alarm interval
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
raise(SIGTERM);
return;
}
struct timeval current_time;
gettimeofday(¤t_time, NULL);
printf("\nLoop count: %lu\n", loop_count);
printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
(current_time.tv_usec - previous_time.tv_usec));
previous_time = current_time;
// format timeval struct
char tmbuf[64];
time_t nowtime = current_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
// write values
pthread_mutex_lock(&mutexmsg);
msg.iteration = loop_count;
snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
loop_count++;
}
static void*
process_msg(void *args)
{
while(1)
{
pthread_mutex_lock(&mutexmsg);
printf("Waiting for condition\n");
pthread_cond_wait(&data_updated_cv, &mutexmsg);
printf("Condition fulfilled\n");
if(!running)
{
break;
}
struct timeval process_time;
gettimeofday(&process_time, NULL);
char tmbuf[64];
char buf[64];
time_t nowtime = process_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);
// something that takes longer than the interval time
// sleep(1);
printf("[%s] Req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
pthread_mutex_unlock(&mutexmsg);
}
pthread_exit(NULL);
}
int
main(int argc, char* argv[])
{
pthread_t thread_id;
pthread_attr_t attr;
// for portability, set thread explicitly as joinable
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
{
perror("pthread_create");
exit(1);
}
pthread_attr_destroy(&attr);
// signal handling setup
struct sigaction t;
t.sa_handler = termination_handler;
sigemptyset(&t.sa_mask);
t.sa_flags = 0;
sigaction(SIGINT, &t, NULL);
sigaction(SIGTERM, &t, NULL);
struct sigaction a;
a.sa_handler = alarm_handler;
sigemptyset(&a.sa_mask);
a.sa_flags = 0;
sigaction(SIGALRM, &a, NULL);
// Set the alarm interval
alarm_interval.it_interval.tv_sec = 0;
alarm_interval.it_interval.tv_usec = 0;
alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
gettimeofday(&previous_time, NULL);
while(1)
{
// suspending main thread until a signal is caught
pause();
if(!running)
{
// signal the worker thread to stop execution
pthread_mutex_lock(&mutexmsg);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
break;
}
if(work)
{
write_msg();
work = false;
}
}
// suspend thread until the worker thread joins back in
pthread_join(thread_id, NULL);
// reset the timer
alarm_interval.it_value.tv_sec = 0;
alarm_interval.it_value.tv_usec = 0;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
printf("EXIT\n");
pthread_exit(NULL);
}发布于 2020-11-27 12:15:36
除了作为同步原语之外,您还没有证明使用消息队列是合理的。您可以通过一个变量和一个原子标志来传递消息,以指示消息就绪。然后,This answer描述了如何使用条件变量实现线程挂起和恢复。这通常是在线程之间完成的,尽管当然不是唯一的方式。
我不知道如何让msgrcv知道它不能期待更多的消息。
没必要这么做。只需发送一条消息,通知线程结束!running变量并不适用:您正在尝试与另一个线程通信,因此请按您选择的方式进行通信:消息!
发布于 2020-11-27 13:05:10
对问题中新提案的回答
因此,这里是具有上述修复/增强的最后一个建议:
#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0
struct message
{
uint64_t iteration;
char req_time[28];
};
static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;
pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;
static void
termination_handler(int signum)
{
running = false;
}
static void
alarm_handler(int signum)
{
work = true;
}
static void
write_msg(void)
{
struct timeval current_time;
gettimeofday(¤t_time, NULL);
printf("\nLoop count: %lu\n", loop_count);
printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
(current_time.tv_usec - previous_time.tv_usec));
previous_time = current_time;
// format timeval struct
char tmbuf[64];
time_t nowtime = current_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
// write values
pthread_mutex_lock(&mutexmsg);
msg.iteration = loop_count;
snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
loop_count++;
}
static void*
process_msg(void *args)
{
while(1)
{
pthread_mutex_lock(&mutexmsg);
printf("Waiting for condition\n");
pthread_cond_wait(&data_updated_cv, &mutexmsg);
printf("Condition fulfilled\n");
if(!running)
{
pthread_mutex_unlock(&mutexmsg); // <----- To avoid deadlock
break;
}
struct timeval process_time;
gettimeofday(&process_time, NULL);
char tmbuf[64];
char buf[64];
time_t nowtime = process_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);
// something that takes longer than the interval time
//sleep(2);
printf("[%s] Req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
pthread_mutex_unlock(&mutexmsg);
}
printf("Thread exiting...\n");
pthread_exit(NULL);
}
int
main(int argc, char* argv[])
{
pthread_t thread_id;
pthread_attr_t attr;
// for portability, set thread explicitly as joinable
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
{
perror("pthread_create");
exit(1);
}
pthread_attr_destroy(&attr);
// signal handling setup
struct sigaction t;
t.sa_handler = termination_handler;
sigemptyset(&t.sa_mask);
t.sa_flags = 0;
sigaction(SIGINT, &t, NULL);
sigaction(SIGTERM, &t, NULL);
struct sigaction a;
a.sa_handler = alarm_handler;
sigemptyset(&a.sa_mask);
a.sa_flags = 0;
sigaction(SIGALRM, &a, NULL);
// Set the alarm interval
alarm_interval.it_interval.tv_sec = 0;
alarm_interval.it_interval.tv_usec = 0;
alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
gettimeofday(&previous_time, NULL);
while(1)
{
// Reset the alarm interval <-------- Rearm the timer in the main loop
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
raise(SIGTERM);
break;
}
// suspending main thread until a signal is caught
pause();
if(!running)
{
// signal the worker thread to stop execution
pthread_mutex_lock(&mutexmsg);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
break;
}
if(work)
{
write_msg();
work = false;
}
}
// suspend thread until the worker thread joins back in
pthread_join(thread_id, NULL);
// reset the timer
alarm_interval.it_value.tv_sec = 0;
alarm_interval.it_value.tv_usec = 0;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
printf("EXIT\n");
pthread_exit(NULL);
}==================================================================
对原始问题的回答
可以使用conditional variable等待来自发送者的信号。这会使接收器唤醒,并通过在msgrcv()的参数中传递IPC_NOWAIT来检查消息队列中的消息。要结束通信,可以发布“通信结束”消息。也可以使用pthread_con_timedwait()定期唤醒并检查“通信结束”或“接收器结束条件”(例如,通过检查您的全局“运行”变量)。
接收端:
// Mutex initialization
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// Condition variable initialization
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
[...]
while (1) {
// Lock the mutex
pthread_mutex_lock(&mutex);
// Check for messages (non blocking thanks to IPC_NOWAIT)
rc = msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, IPC_NOWAIT);
if (rc == -1) {
if (errno == ENOMSG) {
// message queue empty
// Wait for message notification
pthread_cond_wait(&cond, &mutex); // <--- pthread_cond_timedwait() can be used to wake up and check for the end of communication or senders...
} else {
// Error
}
}
// Handle the message, end of communication (e.g. "running" variable)...
// Release the lock (so that the sender can post something in the queue)
pthread_mutex_unlock(&mutex);
}发送方:
// Prepare the message
[...]
// Take the lock
pthread_mutex_lock(&mutex);
// Send the message
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);
// Wake up the receiver
pthread_cond_signal(&cond);
// Release the lock
pthread_mutex_unlock(&mutex);N.B.:SYSV message queues已过时。最好使用全新的Posix services。
发布于 2020-11-29 06:20:57
我花了最后一天的时间阅读了很多关于线程和互斥的内容,并试图让我的示例程序正常工作。是的,但不幸的是,当我试图通过Ctrl+C关闭它时,它被卡住了。原因是(再次)这一次,工作线程等待来自主线程的信号,而主线程将不再发送信号。
@Rachid K.和@Unslander Monica:如果你想再看一遍,这是不是更先进的代码?此外,我认为我必须使用pthread_cond_timedwait而不是pthread_cond_wait来避免终止死锁。你能告诉我具体怎么处理吗?
注意,程序只是周期性地(间隔1s)将一个时间戳和一个循环计数器传递给打印数据的处理线程。输出还会显示调用print的时间。
再次感谢!
#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0
static bool running = true;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
struct message
{
uint64_t iteration;
char req_time[28];
} msg;
pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;
static void
signal_handler(int signum)
{
if (signum == SIGINT || signum == SIGTERM)
{
running = false;
}
}
static void
write_msg(int signum)
{
if(!running)
{
return;
}
// Reset the alarm interval
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
raise(SIGTERM);
return;
}
struct timeval current_time;
gettimeofday(¤t_time, NULL);
printf("\nLoop count: %lu\n", loop_count);
printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
(current_time.tv_usec - previous_time.tv_usec));
previous_time = current_time;
// format timeval struct
char tmbuf[64];
time_t nowtime = current_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
// write values
pthread_mutex_lock(&mutexmsg);
msg.iteration = loop_count;
snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
loop_count++;
}
static void*
process_msg(void *args)
{
while(running)
{
pthread_mutex_lock(&mutexmsg);
printf("Waiting for condition\n");
pthread_cond_wait(&data_updated_cv, &mutexmsg);
printf("Condition fulfilled\n");
struct timeval process_time;
gettimeofday(&process_time, NULL);
char tmbuf[64];
char buf[64];
time_t nowtime = process_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);
printf("[%s] Message req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
pthread_mutex_unlock(&mutexmsg);
}
pthread_exit(NULL);
}
int
main(int argc, char* argv[])
{
pthread_t thread_id;
pthread_attr_t attr;
// for portability, set thread explicitly as joinable
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
{
perror("pthread_create");
exit(1);
}
pthread_attr_destroy(&attr);
// signal handling setup
struct sigaction s;
s.sa_handler = signal_handler;
sigemptyset(&s.sa_mask);
s.sa_flags = 0;
sigaction(SIGINT, &s, NULL);
sigaction(SIGTERM, &s, NULL);
struct sigaction a;
a.sa_handler = write_msg;
sigemptyset(&a.sa_mask);
a.sa_flags = 0;
sigaction(SIGALRM, &a, NULL);
// Set the alarm interval
alarm_interval.it_interval.tv_sec = 0;
alarm_interval.it_interval.tv_usec = 0;
alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
gettimeofday(&previous_time, NULL);
// suspend thread until the worker thread joins back in
pthread_join(thread_id, NULL);
// reset the timer
alarm_interval.it_value.tv_sec = 0;
alarm_interval.it_value.tv_usec = 0;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
pthread_exit(NULL);
return 0;
}https://stackoverflow.com/questions/65030003
复制相似问题