我试图开发一个管道,其中数据首先被读取和处理,一次操作,以一种不同的方式操作,然后显示。我想到了一个设计,在这个设计中,数据IO输入到由第一个操作程序读取的缓冲区中。随后,该第一机械手写入另一个缓冲器,该缓冲器在可能时由第二机械手读取。最后,将第二个机械手的输出写入显示缓冲器,显示缓冲器由可视化器读取,并使用OpenGL进行显示。
在我看来,这是一个相当简单的并行问题,每个任务都有自己的线程,并通过数据缓冲区进行通信。然而,我所遇到的关于线程程序的所有教程似乎都表明,多线程是由一些决定如何分配工作负载的中间件(如OpenMP)决定的。
我刚开始开发多线程应用程序,所以这可能是一个愚蠢的问题,但我所描述的是可行的吗?是否可以使用像OpenMP这样的中间件来完成呢?我意识到显而易见的答案是“尝试它”,我也想这样做,但是这些教程并没有说明如何去尝试它。
发布于 2017-05-25 08:19:01
OpenMP更适合于易于跨越多核(SIMD)的算法。其他方案是可能的,但在您的情况下,我认为直接使用线程将更好地工作,并将更易于编码和维护。
我将我的答案分为两部分:一个没有OpenMP的通用解决方案,以及一些使用OpenMP的特定更改。
正如注释中提到的,您正面临生产者/消费者问题,但有两次:一个线程正在填充一个缓冲区(生成一个项),然后必须用第二个缓冲区(消耗)读取(并修改)该缓冲区。问题的特殊性在于,第二个线程也是一个生成器(要绘制的图像),第三个线程负责使用它(可视化器)。
正如您已经知道的,P/C问题是使用缓冲区(可能是循环缓冲区或生成项队列)解决的,其中缓冲区的每个元素被标记为已生成或消耗,并且线程在添加或从其中获取项时具有独占访问权限。
让我们在下面的示例程序中对您的问题使用队列方法。
注意:为了简单起见,我假设您可以访问C++11编译器。使用其他API的实现相对类似。
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <list>
using namespace std::chrono_literals;
std::mutex g_data_produced_by_m1_mutex;
std::list<int> g_data_produced_by_m1;
std::mutex g_data_produced_by_m2_mutex;
std::list<int> g_data_produced_by_m2;
std::atomic<bool> stop = false;
void manipulator1_kernel()
{
while (!stop) {
// Producer 1: generate data
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
g_data_produced_by_m1.push_back(rand());
}
std::this_thread::sleep_for(100ms);
}
}
void manipulator2_kernel()
{
int data;
while (!stop) {
// Consumer 1
while (!stop) { // wait until there is an item to be consumed
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
if (!g_data_produced_by_m1.empty()) { // is there data to be consumed?
data = g_data_produced_by_m1.front(); // consume
g_data_produced_by_m1.pop_front();
break;
}
}
std::this_thread::sleep_for(100ms);
}
// Producer 2: modify and send to the visualizer
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
g_data_produced_by_m2.push_back(5 * data);
}
std::this_thread::sleep_for(100ms);
}
}
void visualizer_kernel()
{
int data;
while (!stop) {
// Consumer 2
while (!stop) { // wait until there is an item to be visualized
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
if (!g_data_produced_by_m2.empty()) {
data = g_data_produced_by_m2.front();
g_data_produced_by_m2.pop_front();
break;
}
}
std::this_thread::sleep_for(100ms);
}
std::cout << data << std::endl; // render to display
std::this_thread::sleep_for(100ms);
if (data % 8 == 0) stop = true; // some stop condition for the example
}
}
int main()
{
std::thread manipulator1(manipulator1_kernel);
std::thread manipulator2(manipulator2_kernel);
std::thread visualizer(visualizer_kernel);
visualizer.join();
manipulator2.join();
manipulator1.join();
return 0;
}如果您仍然想使用OpenMP,您可能能找到的最接近的东西是任务 (我认为是从OpenMP 3.0开始)。我没有经常使用它们,但是上面的程序可以重写如下:
int main()
{
#pragma omp parallel
{
#pragma omp task
manipulator1_kernel();
#pragma omp task
manipulator2_kernel();
#pragma omp task
visualizer_kernel();
#pragma omp taskwait
}
return 0;
}代码的其余部分也可以更改为使用OpenMP特性,但我认为这回答了您的问题。
这种方法的主要问题是,您必须为任务创建一个代码块,以便在OpenMP parallel中生存,这很容易使您的应用程序逻辑和结构的其余部分复杂化。
发布于 2017-06-05 12:45:24
为了解决这个特殊问题,Intel线程构建块库包含特殊的构造。Intel TBB是帮助多线程编程的跨平台库.我们可以查看应用程序中涉及的实体,以及四个不同的任务提供者。一种类型的任务是输入任务--提供输入数据的任务,第一个操作例程提供的另一种任务,等等。
因此,用户唯一需要做的事情就是为这些任务提供主体。库中有几个API,用于指定要处理的主体以及如何并行处理。其他一切(这里我指线程创建、任务执行之间的同步、工作平衡等等)。是由图书馆完成的。
我想到的解决方案的最简单的变体是使用管道函数。这是原型:
#include "tbb/pipeline.h"
using namespace tbb;
int main() {
parallel_pipeline(/*specify max number of bodies executed in parallel, e.g.*/16,
make_filter<void, input_data_type>(
filter::serial_in_order, // read data sequentially
[](flow_control& fc) -> input_data_type {
if ( /*check some stop condition: EOF, etc.*/ ) {
fc.stop();
return input_data_type(); // return dummy value
}
auto input_data = read_data();
return input_data;
}
) &
make_filter<input_data_type, manipulator1_output_type>(
filter::parallel, // process data in parallel by the first manipulator
[](input_data_type elem) -> manipulator1_output_type {
auto processed_elem = manipulator1::process(elem);
return processed_elem;
}
) &
make_filter<manipulator1_output_type, manipulator2_output_type>(
filter::parallel, // process data in parallel by the second manipulator
[](manipulator1_output_type elem) -> manipulator2_output_type {
auto processed_elem = manipulator2::process(elem);
return processed_elem;
}
) &
make_filter<manipulator2_output_type, void>(
filter::serial_in_order, // visualize frame by frame
[](manipulator2_output_type elem) {
visualize(elem);
}
)
);
return 0;
}只要实现了必要的函数(read_data,可视化)。这里,input_data_type、manipulator1_output_type等是管道级之间传递的类型,而机械手的process函数对传递的参数进行必要的计算。
顺便说一句,为了避免使用锁和其他同步原语,您可以使用库中的concurrent_bounded_queue并通过可能不同的线程(例如专用于IO操作)将输入数据放入这个队列,就像concurrent_bounded_queue_instance.push(elem)一样简单,然后通过input_data_type elem; concurrent_bounded_queue_instance.pop(elem)读取它。请注意,弹出一个项在这里是一个阻塞操作。concurrent_queue提供了非阻塞try_pop替代方案。
另一种可能是使用tbb::flow_graph及其节点来组织相同的流水线方案。看看描述依赖性和数据流图的两个例子。您可能需要使用节点来正确排序项的执行(如果需要的话)。
值得阅读使用tbb标记的SO问题,看看其他人是如何使用这个库的。
发布于 2017-06-06 15:48:18
您实现了单线程版本吗?侧写的?
它们是关键步骤,您可以获得高度并行设计的最佳实现,只是为了认识到瓶颈是缓冲区和/或线程同步和/或错误共享和/或缓存丢失或类似问题的I/O。
首先,我尝试使用一个简单的线程池,其中包含按顺序执行所有步骤的任务。然后,在分析了它的工作原理、CPU消耗等问题之后,我尝试了更复杂的工具,总是将它们的性能与第一个简单版本进行比较。
https://stackoverflow.com/questions/44169351
复制相似问题