首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用于数据流采集和处理的c++多线程实现方法

用于数据流采集和处理的c++多线程实现方法
EN

Stack Overflow用户
提问于 2022-03-31 08:00:15
回答 1查看 49关注 1票数 0

我是c++开发的新手。我试图运行相互独立的无限函数。以下是问题陈述的笑料:

我试图实现的方法是

代码语言:javascript
复制
#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <mutex>

int g_i = 0;
std::mutex g_i_mutex; // protects g_i

// increment g_i by 1
void increment_itr()
{
  const std::lock_guard<std::mutex> lock(g_i_mutex);
  g_i += 1;
}

void *fun(void *s)
{
  std::string str;
  str = (char *)s;
  std::cout << str << " start\n";
  while (1)
  {
    std::cout << str << " " << g_i << "\n";
    if(g_i > 1000) break;
    increment_itr();
  }
  pthread_exit(NULL);
  std::cout << str << " end\n";
}

void *checker(void *s) {
  while (1) {
    if(g_i > 1000) {
      std::cout<<"**********************\n";
      std::cout << "checker: g_i == 100\n";
      std::cout<<"**********************\n";
      pthread_exit(NULL);
    }
  }
}


int main()
{
  int itr = 0;
  pthread_t threads[3];
  pthread_attr_t attr;
  void *status;

  // Initialize and set thread joinable
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  int rc1 = pthread_create(&threads[0], &attr, fun, (void *)&"foo");
  int rc2 = pthread_create(&threads[1], &attr, fun, (void *)&"bar");
  int rc3 = pthread_create(&threads[2], &attr, checker, (void *)&"checker");
  

  if (rc1 || rc2 || rc3)
  {
    std::cout << "Error:unable to create thread," << rc1 << rc2 << rc3 << std::endl;
    exit(-1);
  }

  pthread_attr_destroy(&attr);

  std::cout << "main func continues\n";

  for (int i = 0; i < 3; i++)
  {
    rc1 = pthread_join(threads[i], &status);
    if (rc1)
    {
      std::cout << "Error:unable to join," << rc1 << std::endl;
      exit(-1);
    }
    std::cout << "Main: completed thread id :" << i;
    std::cout << "  exiting with status :" << status << std::endl;
  }

  std::cout << "main end\n";

  return 0;
}

这是可行的,但我想知道这个实现是一种标准的方法,还是可以用更好的方式完成?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-03-31 08:09:36

您在increment_itr中正确地使用了一个锁,但是您的fun函数在访问g_i时没有获得锁。

改变这一点:

代码语言:javascript
复制
void increment_itr()
{
  const std::lock_guard<std::mutex> lock(g_i_mutex);
  g_i += 1;
}

到这个

代码语言:javascript
复制
int increment_itr()
{
  std::lock_guard<std::mutex> lock(g_i_mutex);  // the const wasn't actually needed
  g_i = g_i + 1;
  return g_i;  // return the updated value of g_i
}

这不是线程安全的:

代码语言:javascript
复制
if(g_i > 1000) break;  // access g_i without acquiring the lock
increment_itr();

这个-更好:

代码语言:javascript
复制
if (increment_itr() > 1000) {
    break;
}

checker中需要类似的修复

代码语言:javascript
复制
void *checker(void *s) {
  while (1) {
    int i;
    {
      std::lock_guard<std::mutex> lock(g_i_mutex);
      i = g_i;
    }
    if(i > 1000) {
      std::cout<<"**********************\n";
      std::cout << "checker: g_i == 100\n";
      std::cout<<"**********************\n";
      break;
    }
    return NULL;
  }

关于你的设计问题。这是最根本的问题。

您正在提议一个专用线程,该线程不断地获取一个锁,并对数据结构进行某种类型的检查。如果满足某一条件,它将执行一些额外的处理,例如写入数据库。如果数据结构(这两个映射)中没有任何变化,那么在无限循环中旋转的线程将是浪费的。相反,您只希望在某些事情发生变化时运行您的完整性检查。您可以使用条件变量让检查器线程暂停,直到某些事情发生实际变化。

这是一个更好的设计。

代码语言:javascript
复制
uint64_t g_data_version = 0;
std::conditional_variable g_cv;

void *fun(void *s)
{
    while (true) {

        << wait for data from the source >> 

        {
            std::lock_guard<std::mutex> lock(g_i_mutex);
            // update the data in the map while under a lock
            // e.g. g_n++;
            //

            // increment the data version to signal a new revision has been made
            g_data_version += 1;
        }

        // notify the checker thread that something has changed
        g_cv.notify_all();
    }

}

然后,您的检查器函数只有在fun发出信号表示发生了变化时才会醒来。

代码语言:javascript
复制
void *checker(void *s) {
  while (1) {

      // lock the mutex
      std::unique_lock<std::mutex> lock(g_i_mutex);

      // do the data comparison check here

      // now wait for the data version to change
      uint64_t version = g_data_version;
      while (version != g_data_version) { // check for spurious wake up
         cv.wait(lock); // this atomically unlocks the mutex and waits for a notify() call on another thread to happen
      }
  }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71688949

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档