我正在玩元胞自动机,并试图通过使用多线程来提高我的应用程序性能。但我有一些有趣的结果。我不知道为什么会发生这种事,我错过了什么.
因此,我的目标是尽可能快地处理大量数据缓冲区。在我的示例中,我有一个大的(20000×20000) bool数组,并将其转换为图像(一个bool值到一个像素)。这个过程可以并行进行;像素之间不存在任何依赖关系。我将bool数组划分为threadCount块数,为每个块启动一个新线程,让它们运行,然后等待它们完成。
我以为有了更多的线程,我的运行时会更好一些。(我不使用不现实的线程计数,只在一个和逻辑核心编号之间使用。)
所以我写了这个:
typedef std::size_t Size;
typedef std::vector<bool> Data;
typedef std::vector<Data> History;
class RenderTask
{
public:
typedef void result_type;
public:
RenderTask(Ppm& ppm,
const Ppm::Pixel& fColor)
: mPpm(ppm),
mForegroundColor(fColor)
{
}
void operator()(const History& history,
const Size minIdxX,
const Size countX,
const Size minIdxY,
const Size countY)
{
const Size maxIdxX(minIdxX + countX);
const Size maxIdxY(minIdxY + countY);
for(Size y(minIdxY); y < maxIdxY; ++y)
{
for(Size x(minIdxX); x < maxIdxX; ++x)
{
if(history[y][x])
{
mPpm.setPixel(x, y, mForegroundColor);
}
}
}
}
private:
Ppm& mPpm;
const Ppm::Pixel mForegroundColor;
};
void render(const History& history,
Ppm& ppm,
const Ppm::Pixel& fColor,
const Size threadCount)
{
boost::asio::io_service io_service;
boost::thread_group threads;
for(Size i(0); i < threadCount; ++i)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run,
&io_service));
}
RenderTask task(ppm, fColor);
io_service.reset();
const Size count(history.size() / threadCount);
const Size rem(history.size() % threadCount);
Size minIdxY(0);
for(Size i(0); i < threadCount; ++i)
{
const bool addRemainders(rem && i == threadCount - 1);
io_service.post(boost::bind(task,
boost::cref(history),
0,
history.front().size(),
minIdxY,
addRemainders ? count + rem : count));
minIdxY += count;
}
threads.join_all();
}
int main(int argc, char* argv[])
{
const Size rule(parseNumber<Size>(argv[1]));
const Size size(parseNumber<Size>(argv[2]));
const Size iteration(parseNumber<Size>(argv[3]));
const Size threadCount(clamp(1,
static_cast<Size>(boost::thread::physical_concurrency())
parseNumber<Size>(argv[4])));
...
History history(iteration, Data(size, false));
history.front()[size / 2] = true;
...
process(history, rule, threadCount);
...
Ppm ppm(history.front().size(), history.size(), Ppm::Pixel(30, 30, 30));
std::cout << "rendering... ";
t.start();
render(history, ppm, Ppm::Pixel(200, 200, 200), threadCount);
t.stop();
std::cout << t.ms() << " ms" << std::endl;
}但是,当我以不同数量的线程运行程序时,我得到了以下信息:
我不知道为什么更多的核心不能带来更好的性能。有两个核更好,但有趣的是,有三个核,它几乎和一个核心一样.这些数值是平均数:
ThreadingTest.exe 110 20000 20000 1 test.ppm
rendering... 554.95 ms
ThreadingTest.exe 110 20000 20000 2 test.ppm
rendering... 289.75 ms
ThreadingTest.exe 110 20000 20000 3 test.ppm
rendering... 555.37 ms
ThreadingTest.exe 110 20000 20000 4 test.ppm
rendering... 554.23 ms
ThreadingTest.exe 110 20000 20000 5 test.ppm
rendering... 564.23 ms
ThreadingTest.exe 110 20000 20000 6 test.ppm
rendering... 551.82 ms
ThreadingTest.exe 110 20000 20000 7 test.ppm
rendering... 555.22 ms
ThreadingTest.exe 110 20000 20000 8 test.ppm
rendering... 510.12 ms是什么导致了这一切?我是不是用错了io_service?不涉及I/O操作,只涉及纯内存。
我的机器有八个核心,内存为16 GB。
关于更多的细节,以下是Ppm级的大纲:
class Ppm
{
public:
struct Pixel
{
typedef unsigned char ChannelType;
ChannelType mRed, mGreen, mBlue;
...
};
typedef std::vector<Pixel> ImageData;
Ppm( const SizeType width
, const SizeType height
, const Pixel& color = Pixel() )
: mWidth( width )
, mHeight( height )
, mImageData( mWidth * mHeight, color )
{ }
void setPixel( SizeType x, SizeType y, const Pixel& p )
{
mImageData[x + y * mWidth] = p;
}
...
private:
const SizeType mWidth;
const SizeType mHeight;
ImageData mImageData;
};更新
在你的宝贵评论之后,我改变了很多方法,并写到:现在我使用的是纯c++'11的东西,不再有任何的提升。
class ThreadPool
{
public:
ThreadPool(const Size threadCount);
~ThreadPool();
public:
template<class T>
void addTask(T task);
void wait();
private:
bool mStopped;
Size mRunningCount;
std::vector<std::thread> mWorkers;
std::deque<std::function<void()>> mTasks;
std::mutex mMutex;
std::condition_variable mCondition;
std::condition_variable mFinishCondition;
};
ThreadPool::ThreadPool(const Size threadCount)
: mStopped(false),
mRunningCount(0)
{
for (Size i(0); i < threadCount; ++i)
{
mWorkers.push_back(std::thread([this]()
{
std::function<void()> task;
while(true)
{
{
std::unique_lock<std::mutex> lock(this->mMutex);
this->mCondition.wait(lock, [this] { return this->mStopped ||
!this->mTasks.empty();
});
if(this->mStopped)
{
return;
}
++this->mRunningCount;
task = this->mTasks.front();
this->mTasks.pop_front();
}
task();
{
std::unique_lock<std::mutex> lock(this->mMutex);
--this->mRunningCount;
}
this->mFinishCondition.notify_all();
}
}));
}
}
ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(mMutex);
mStopped = true;
mCondition.notify_all();
}
for(auto& worker : mWorkers)
{
worker.join();
}
}
template<class T>
void ThreadPool::addTask(T task)
{
{
std::unique_lock<std::mutex> lock(mMutex);
mTasks.push_back(std::function<void()>(task));
}
mCondition.notify_one();
}
void ThreadPool::wait()
{
std::unique_lock<std::mutex> lock(mMutex);
mFinishCondition.wait(lock, [this]()
{
return mTasks.empty() && mRunningCount == 0;
});
}现在性能还好;使用更多的线程运行时会变得更快.但是,等待的方法有些不好。我是这样用的:
ThreadPool pool(threadCount);
for(Size i(1); i < iteration; ++i)
{
Size count(history.front().size() / threadCount);
Size rem(history.front().size() % threadCount);
Size minIdx(0);
for(Size n(0); n < threadCount; ++n)
{
pool.addTask(std::bind(ECATask(rule),
std::cref(history[i-1]),
std::ref(history[i]),
minIdx,
(rem && n == threadCount - 1) ?
count + rem :
count));
minIdx += count;
}
pool.wait();
}这方面的问题还不清楚,但似乎pool.wait()有时不等待所有当前任务完成,代码开始新的迭代.你能帮我做个代码检查吗?:)
发布于 2018-01-03 14:11:53
有相当多的迹象表明您对io_service感到困惑:
boost::asio::io_service io_service;
boost::thread_group threads;
for(Size i(0); i < threadCount; ++i)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run
, &io_service));
}
RenderTask task(ppm, fColor);
io_service.reset();这些问题:
run()立即完成,退出线程。因此,整个过程是一个巨大的竞争条件:如果幸运的话,一些线程(可能没有线程)不会在第一个任务排队之前启动io_service::run(),所以所有这些看起来都是可行的。
看看这个答案,我们可以找到一个好的线程池,而不是使用Boost Asio,还有一个使用了Boost Asio的线程池:c++ work queues with blocking。
注意使用
io_service::work避免过早退出工作线程。
https://stackoverflow.com/questions/48076701
复制相似问题