我想查看一个图像,并处理与元素的order有关的一些特定值。该图像具有一个unsigned char*数组,它包含一个掩码(255个像素应该被处理,否则为0)和一个具有像素值的unsigned short*数组。
我用tbb实现了三种不同的方法,并通过掩码数组使用单个for-循环,并从循环变量x = i%width; y = i/width;中计算x,y-坐标。如果像素是可见的,我想使用Eigen转换点。vector4d是存储点的std::vector<std::array<double,4>>。
以下是我在tbb中的三个实现:
tbb::combinable 和 tbb::parallel_for
void Combinable(int width, int height, unsigned char* mask,unsigned short* pixel){
MyCombinableType.clear();
MyCombinableType.local().reserve(width*height);
tbb::parallel_for( tbb::blocked_range<int>(0, width*height),
[&](const tbb::blocked_range<int> &r)
{
vector4d& local = MyCombinableType.local();
const size_t end = r.end();
for (int i = r.begin(); i != end; ++i)
{
if(mask[i]!=0)
{
array4d arr = {i%width,i/width,(double)pixel[i],1};
//Map with Eigen and transform
local.push_back(arr);
}
}
});
vector4d idx = MyCombinableType.combine(
[]( vector4d x, vector4d y)
{
std::size_t n = x.size();
x.resize(n + y.size());
std::move(y.begin(), y.end(), x.begin() + n);
return x;
});
}tbb::enumerable_thread_specific 2. 和 tbb::parallel_for**:**
void Enumerable(int width, int height, unsigned char* mask,unsigned short* pixel){
MyEnumerableType.clear();
MyEnumerableType.local().reserve(width*height);
tbb::parallel_for( tbb::blocked_range<int>(0, width*height),
[&](const tbb::blocked_range<int> &r)
{
enumerableType::reference local = MyEnumerableType.local();
for (int i = r.begin(); i != r.end(); ++i)
{
if(mask[i]!=0)
{
array4d arr = {i%width,i/width,(double)pixel[i],1};
//Map with Eigen and transform
local.push_back(arr);
}
}
});
vector4d idx = MyEnumerableType.combine(
[](vector4d x, vector4d y)
{
std::size_t n = x.size();
x.resize(n + y.size());
std::move(y.begin(), y.end(), x.begin() + n);
return x;
});
}tbb::parallel_reduce**:** 3.
void Reduce(int width, int height, unsigned char* mask,unsigned short* pixel){
vector4d idx = tbb::parallel_reduce(
tbb::blocked_range<int>(0, width*height ),vector4d(),
[&](const tbb::blocked_range<int>& r, vector4d init)->vector4d
{
const size_t end = r.end();
init.reserve(r.size());
for( int i=r.begin(); i!=end; ++i )
{
if(mask[i]!=0)
{
array4d arr = {i%width,i/width,(double)pixel[i],1};
//Map with Eigen and transform
init.push_back(arr);
}
}
return init;
},
[]( vector4d x,vector4d y )
{
std::size_t n = x.size();
x.resize(n + y.size());
std::move(y.begin(), y.end(), x.begin() + n);
return x;
}
);
}我将这三个版本的运行时与串行实现进行了比较。阵列有8400000个单元,每组重复100次。研究结果如下:
我假设combine语句是这里的瓶颈。我做错什么了?为什么parallel_reduce soo要慢得多?请帮帮我!
发布于 2016-08-25 15:23:28
您可以在这里应用很少的优化。
const vector4d&代替,在任何地方都使用[&] lambda。vector4d,而不是调整其中一个参数的大小,并将其用于返回语句。blocked_range2d而不是计算x = i%width; y = i/width。这不仅优化了过多的计算,而且更重要的是,它优化了可能改善缓存使用的缓存访问模式(但在本例中并非如此)。发布于 2016-09-19 13:06:23
您正在使用parallel_reduce的函数式形式,改用更有效的命令式。不幸的是,不能使用lambdas调用它,您必须定义一个Body类:
它应该最小化在您的缩减过程中生成的vector4d副本的数量。vector4d应该是您的Body类的成员,这样它就可以被多个范围重用和附加,而不是为每个细分范围构建和合并唯一的vector4d。
(注意:拆分构造函数不应该复制vector4d成员的内容,注意在上面的英特尔示例中,value总是被初始化为0。)
https://stackoverflow.com/questions/39083061
复制相似问题