首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >防止msgrcv无限期等待

防止msgrcv无限期等待
EN

Stack Overflow用户
提问于 2020-11-27 06:33:57
回答 4查看 382关注 0票数 1

我有一个多线程程序,它的2个线程通过一个消息队列相互通信。第一个线程(发送者)定期发送消息,而第二个线程(接收者)处理信息。

发送者的代码类似于:

代码语言:javascript
复制
// Create queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664 | IPC_CREAT);

// Create message and send
struct request_msg req_msg;
req_msg.mtype = 1;
snprintf(req_msg.mtext, MSG_LENGTH, "Send this information");
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);

在接收线程上,我这样做:

代码语言:javascript
复制
// Subscribe to queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664);

struct request_msg req_msg;

while(running)
{
    msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);
    // Do sth with the message
}

正如您所看到的,接收器位于一个while循环中,该循环由一个名为"running“的全局变量控制。如果在进程中遇到错误,错误处理程序会将布尔值设置为false。这在大多数情况下都是有效的,但是如果在能够向队列发送消息之前发生错误,接收器将不会退出while循环,因为它在继续之前等待消息,从而检查正在运行的变量。这意味着它将永远挂在那里,因为发送方在运行时的剩余时间内不会发送任何内容。

我想避免这种情况,但我不知道如何让msgrcv知道它不能期待更多的消息。假设这是最简单的版本,我无法了解如果我杀死队列,msgrcv是如何运行的。也许超时或发送某种终止消息(可能使用消息结构的mtype成员)也是可能的。

请让我知道这个问题最可靠的解决方案是什么。谢谢!

编辑:根据建议,我重新编写了代码,使信号处理程序成为原子操作。

代码语言:javascript
复制
#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>


#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0

struct message 
{
    uint64_t iteration;
    char req_time[28];
};

static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;

pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;


static void
termination_handler(int signum)
{
    running = false;
}


static void 
alarm_handler(int signum)
{
    work = true;
}


static void
write_msg(void)
{
    // Reset the alarm interval
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        raise(SIGTERM);
        return;
    }

    struct timeval current_time;
    gettimeofday(&current_time, NULL);
    printf("\nLoop count: %lu\n", loop_count);
    printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
                           (current_time.tv_usec - previous_time.tv_usec));
    previous_time = current_time;

    // format timeval struct
    char tmbuf[64];
    time_t nowtime = current_time.tv_sec;
    struct tm *nowtm = localtime(&nowtime);
    strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);

    // write values
    pthread_mutex_lock(&mutexmsg);
    msg.iteration = loop_count;
    snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
    pthread_cond_signal(&data_updated_cv);
    pthread_mutex_unlock(&mutexmsg);

    loop_count++;
}


static void* 
process_msg(void *args)
{
    while(1)
    {
        pthread_mutex_lock(&mutexmsg);

        printf("Waiting for condition\n");
        pthread_cond_wait(&data_updated_cv, &mutexmsg);
        printf("Condition fulfilled\n");

        if(!running)
        {
            break;
        }

        struct timeval process_time;
        gettimeofday(&process_time, NULL);

        char tmbuf[64];
        char buf[64];
        time_t nowtime = process_time.tv_sec;
        struct tm *nowtm = localtime(&nowtime);
        strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
        snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);

        // something that takes longer than the interval time
        // sleep(1);

        printf("[%s] Req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
        pthread_mutex_unlock(&mutexmsg);

    }

    pthread_exit(NULL);
}



int
main(int argc, char* argv[])
{
    pthread_t thread_id;
    pthread_attr_t attr;

    // for portability, set thread explicitly as joinable
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
    {
        perror("pthread_create");
        exit(1);
    }

    pthread_attr_destroy(&attr);

    // signal handling setup
    struct sigaction t;
    t.sa_handler = termination_handler;
    sigemptyset(&t.sa_mask);
    t.sa_flags = 0;
    sigaction(SIGINT, &t, NULL);
    sigaction(SIGTERM, &t, NULL);

    struct sigaction a;
    a.sa_handler = alarm_handler;
    sigemptyset(&a.sa_mask);
    a.sa_flags = 0;
    sigaction(SIGALRM, &a, NULL);
    
    // Set the alarm interval
    alarm_interval.it_interval.tv_sec = 0;
    alarm_interval.it_interval.tv_usec = 0;
    alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
    alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;

    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    gettimeofday(&previous_time, NULL);

    while(1)
    {
        // suspending main thread until a signal is caught
        pause();

        if(!running)
        {
            // signal the worker thread to stop execution
            pthread_mutex_lock(&mutexmsg);
            pthread_cond_signal(&data_updated_cv);
            pthread_mutex_unlock(&mutexmsg);

            break;
        }

        if(work)
        {
            write_msg();
            work = false;
        }
    }

    // suspend thread until the worker thread joins back in
    pthread_join(thread_id, NULL);

    // reset the timer
    alarm_interval.it_value.tv_sec = 0;
    alarm_interval.it_value.tv_usec = 0;
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    printf("EXIT\n");
    pthread_exit(NULL);
    
}
EN

回答 4

Stack Overflow用户

发布于 2020-11-27 12:15:36

除了作为同步原语之外,您还没有证明使用消息队列是合理的。您可以通过一个变量和一个原子标志来传递消息,以指示消息就绪。然后,This answer描述了如何使用条件变量实现线程挂起和恢复。这通常是在线程之间完成的,尽管当然不是唯一的方式。

我不知道如何让msgrcv知道它不能期待更多的消息。

没必要这么做。只需发送一条消息,通知线程结束!running变量并不适用:您正在尝试与另一个线程通信,因此请按您选择的方式进行通信:消息!

票数 1
EN

Stack Overflow用户

发布于 2020-11-27 13:05:10

对问题中新提案的回答

  • 循环计时器应该在主线程的事件循环中重新武装,以获得更好的可见性(主观判断proposal);
  • When辅助线程退出其循环,它必须释放互斥锁,否则主线程将进入死锁(等待被终止的辅助线程锁定的互斥锁)。

因此,这里是具有上述修复/增强的最后一个建议:

代码语言:javascript
复制
#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>


#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0

struct message 
{
    uint64_t iteration;
    char req_time[28];
};

static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;

pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;


static void
termination_handler(int signum)
{
    running = false;
}


static void 
alarm_handler(int signum)
{
    work = true;
}


static void
write_msg(void)
{

    struct timeval current_time;
    gettimeofday(&current_time, NULL);
    printf("\nLoop count: %lu\n", loop_count);
    printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
                           (current_time.tv_usec - previous_time.tv_usec));
    previous_time = current_time;

    // format timeval struct
    char tmbuf[64];
    time_t nowtime = current_time.tv_sec;
    struct tm *nowtm = localtime(&nowtime);
    strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);

    // write values
    pthread_mutex_lock(&mutexmsg);
    msg.iteration = loop_count;
    snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
    pthread_cond_signal(&data_updated_cv);
    pthread_mutex_unlock(&mutexmsg);

    loop_count++;
}


static void* 
process_msg(void *args)
{
    while(1)
    {
        pthread_mutex_lock(&mutexmsg);

        printf("Waiting for condition\n");
        pthread_cond_wait(&data_updated_cv, &mutexmsg);
        printf("Condition fulfilled\n");

        if(!running)
        {
            pthread_mutex_unlock(&mutexmsg); // <----- To avoid deadlock
            break;
        }

        struct timeval process_time;
        gettimeofday(&process_time, NULL);

        char tmbuf[64];
        char buf[64];
        time_t nowtime = process_time.tv_sec;
        struct tm *nowtm = localtime(&nowtime);
        strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
        snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);

        // something that takes longer than the interval time
        //sleep(2);

        printf("[%s] Req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
        pthread_mutex_unlock(&mutexmsg);

    }

    printf("Thread exiting...\n");
    pthread_exit(NULL);
}



int
main(int argc, char* argv[])
{
    pthread_t thread_id;
    pthread_attr_t attr;

    // for portability, set thread explicitly as joinable
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
    {
        perror("pthread_create");
        exit(1);
    }

    pthread_attr_destroy(&attr);

    // signal handling setup
    struct sigaction t;
    t.sa_handler = termination_handler;
    sigemptyset(&t.sa_mask);
    t.sa_flags = 0;
    sigaction(SIGINT, &t, NULL);
    sigaction(SIGTERM, &t, NULL);

    struct sigaction a;
    a.sa_handler = alarm_handler;
    sigemptyset(&a.sa_mask);
    a.sa_flags = 0;
    sigaction(SIGALRM, &a, NULL);
    
    // Set the alarm interval
    alarm_interval.it_interval.tv_sec = 0;
    alarm_interval.it_interval.tv_usec = 0;
    alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
    alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;

    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    gettimeofday(&previous_time, NULL);

    while(1)
    {
        // Reset the alarm interval <-------- Rearm the timer in the main loop
        if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
        {
          perror("setitimer");
          raise(SIGTERM);
          break;
        }

        // suspending main thread until a signal is caught
        pause();

        if(!running)
        {
            // signal the worker thread to stop execution
            pthread_mutex_lock(&mutexmsg);
            pthread_cond_signal(&data_updated_cv);
            pthread_mutex_unlock(&mutexmsg);

            break;
        }

        if(work)
        {
            write_msg();
            work = false;
        }
    }

    // suspend thread until the worker thread joins back in
    pthread_join(thread_id, NULL);

    // reset the timer
    alarm_interval.it_value.tv_sec = 0;
    alarm_interval.it_value.tv_usec = 0;
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    printf("EXIT\n");
    pthread_exit(NULL);
    
}

==================================================================

对原始问题的回答

可以使用conditional variable等待来自发送者的信号。这会使接收器唤醒,并通过在msgrcv()的参数中传递IPC_NOWAIT来检查消息队列中的消息。要结束通信,可以发布“通信结束”消息。也可以使用pthread_con_timedwait()定期唤醒并检查“通信结束”或“接收器结束条件”(例如,通过检查您的全局“运行”变量)。

接收端:

代码语言:javascript
复制
// Mutex initialization
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// Condition variable initialization
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
[...]
while (1) {
  // Lock the mutex
  pthread_mutex_lock(&mutex);

  // Check for messages (non blocking thanks to IPC_NOWAIT)
  rc = msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, IPC_NOWAIT);
  if (rc == -1) {
    if (errno == ENOMSG) {

      // message queue empty

      // Wait for message notification
      pthread_cond_wait(&cond, &mutex); // <--- pthread_cond_timedwait() can be used to wake up and check for the end of communication or senders...

    } else {
      // Error
    }
  }

  // Handle the message, end of communication (e.g. "running" variable)...

  // Release the lock (so that the sender can post something in the queue)
  pthread_mutex_unlock(&mutex);
}

发送方:

代码语言:javascript
复制
// Prepare the message
[...]
// Take the lock
pthread_mutex_lock(&mutex);

// Send the message
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);

// Wake up the receiver
pthread_cond_signal(&cond);

// Release the lock
pthread_mutex_unlock(&mutex);

N.B.SYSV message queues已过时。最好使用全新的Posix services

票数 0
EN

Stack Overflow用户

发布于 2020-11-29 06:20:57

我花了最后一天的时间阅读了很多关于线程和互斥的内容,并试图让我的示例程序正常工作。是的,但不幸的是,当我试图通过Ctrl+C关闭它时,它被卡住了。原因是(再次)这一次,工作线程等待来自主线程的信号,而主线程将不再发送信号。

@Rachid K.和@Unslander Monica:如果你想再看一遍,这是不是更先进的代码?此外,我认为我必须使用pthread_cond_timedwait而不是pthread_cond_wait来避免终止死锁。你能告诉我具体怎么处理吗?

注意,程序只是周期性地(间隔1s)将一个时间戳和一个循环计数器传递给打印数据的处理线程。输出还会显示调用print的时间。

再次感谢!

代码语言:javascript
复制
#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>


#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0


static bool running = true;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;

struct message 
{
    uint64_t iteration;
    char req_time[28];
} msg;

pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;


static void
signal_handler(int signum)
{
    if (signum == SIGINT || signum == SIGTERM) 
    {
        running = false;
    }
}


static void
write_msg(int signum)
{
    if(!running)
    {
        return;
    }

    // Reset the alarm interval
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        raise(SIGTERM);
        return;
    }

    struct timeval current_time;
    gettimeofday(&current_time, NULL);
    printf("\nLoop count: %lu\n", loop_count);
    printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
                           (current_time.tv_usec - previous_time.tv_usec));
    previous_time = current_time;

    // format timeval struct
    char tmbuf[64];
    time_t nowtime = current_time.tv_sec;
    struct tm *nowtm = localtime(&nowtime);
    strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);

    // write values
    pthread_mutex_lock(&mutexmsg);
    msg.iteration = loop_count;
    snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
    pthread_cond_signal(&data_updated_cv);
    pthread_mutex_unlock(&mutexmsg);

    loop_count++;
}


static void* 
process_msg(void *args)
{
    while(running)
    {
        pthread_mutex_lock(&mutexmsg);

        printf("Waiting for condition\n");
        pthread_cond_wait(&data_updated_cv, &mutexmsg);
        printf("Condition fulfilled\n");
        struct timeval process_time;
        gettimeofday(&process_time, NULL);

        char tmbuf[64];
        char buf[64];
        time_t nowtime = process_time.tv_sec;
        struct tm *nowtm = localtime(&nowtime);
        strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
        snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);

        printf("[%s] Message req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
        pthread_mutex_unlock(&mutexmsg);

    }

    pthread_exit(NULL);
}


int
main(int argc, char* argv[])
{
    pthread_t thread_id;
    pthread_attr_t attr;

    // for portability, set thread explicitly as joinable
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
    {
        perror("pthread_create");
        exit(1);
    }

    pthread_attr_destroy(&attr);

    // signal handling setup
    struct sigaction s;
    s.sa_handler = signal_handler;
    sigemptyset(&s.sa_mask);
    s.sa_flags = 0;
    sigaction(SIGINT, &s, NULL);
    sigaction(SIGTERM, &s, NULL);

    struct sigaction a;
    a.sa_handler = write_msg;
    sigemptyset(&a.sa_mask);
    a.sa_flags = 0;
    sigaction(SIGALRM, &a, NULL);
    
    // Set the alarm interval
    alarm_interval.it_interval.tv_sec = 0;
    alarm_interval.it_interval.tv_usec = 0;
    alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
    alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;

    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    gettimeofday(&previous_time, NULL);

    // suspend thread until the worker thread joins back in
    pthread_join(thread_id, NULL);

    // reset the timer
    alarm_interval.it_value.tv_sec = 0;
    alarm_interval.it_value.tv_usec = 0;
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    pthread_exit(NULL);
    return 0;
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65030003

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档