我正在编写一个类似MD的模拟程序,我在使代码运行得更快方面遇到了一些困难。我使用Call砂子和kcache差制对代码进行了剖析,我似乎用了大约30%的时间来计算距离,43%用于设置链接单元列表和计算我的力,20%用于malloc和free (我假设在创建新的向量时调用了这些代码)。在我的模拟中,我使用了两个类:“粒子”,包含两个STL矢量(位置和速度);和“系统”,它包含粒子向量和我用来模拟的函数。
我使用的链接单元法包括将系统划分为大小相等的块(在我的例子中只有'1'),然后只考虑相邻块中粒子之间的相互作用。采用周期边界条件。
几乎所有CPU时间都在使用的两个函数如下:
bool checkDistance(Particle& p1, Particle& p2){ //checks whether the distance between p1 and p2 is less than 1
double dist = 0, relDist;
for ( int i = 0; i < dim; i++){ //loop over all dimensions
relDist = p1.pos[i] - p2.pos[i];
if (fabs(relDist) > halfSize){ //max separation in one dimension is half of system size with PBC
if(relDist < 0){ //shift the relative distance to the shortest possible one (minimum image convention)
relDist += systemSize;
}
else {
relDist -= systemSize;
}
}
dist += relDist * relDist;
}
return (dist < 1);
}和
vector< vector< double> > System::calcForces(vector<vector<double> >& noise){ //calculate the force working on all particles in my system, adds a noise to this force (which is made in other function)
vector<vector<double> > forces (numberOfParticles, vector<double>(dim,0.0)); //contains the force working on every particle
/* this looks up neighbors with linked cell method*/
int numberOfCells = systemSize/interactionRange; //number of cells in one dimension. This way, all particles within interactionRange from a certain particle are in the same cell and in the neighboring cells.
vector<vector<int> > header(numberOfCells, vector<int> (numberOfCells, -1)); //contains highest particle index for each cell, -1 if no particle
vector<int> link(numberOfParticles, -1); //gives index of particle in same cell; -1 if no particle
vector<int> centralCell, neighboringCell; //these contain the indices of particle in central cell and four of its neighbouring cells (in 2D). Don't need to loop over all neighbours for all cells.
centralCell.reserve(numberOfParticles/(systemSize*systemSize) *2); //this is the amount of ints every cell will on average contain, probably unnecessary to reserve
neighboringCell.reserve(numberOfParticles/(systemSize*systemSize) *2* 4);
vector<vector<double> > unitVectors (numberOfParticles, vector<double>(dim)); //create unit vector of velocities for all particles, needed for later force calculation
double norm = 0.0;
double dotProduct = 0.0; //needed for force calculation
for(int i = 0; i < numberOfParticles; i++){
for(int j = 0; j < dim; j++){
norm += particleList[i].vel[j] * particleList[i].vel[j];
}
norm = sqrt(norm);
for (int j = 0; j < dim; j++){
unitVectors[i][j] = particleList[i].vel[j] / norm;
}
norm = 0.0;
}
vector<vector<double> > averages(numberOfParticles, vector<double>(dim,0.0)); //contains average velocities of neighboring particles
vector<int> neighborCount(numberOfParticles,0); //number of neighbours for every particle
for (int i = 0; i < numberOfParticles; i++){ //fill header and link
int xIndex = numberOfCells * particleList[i].pos[0] / systemSize;
int yIndex = numberOfCells * particleList[i].pos[1] / systemSize;
link[i] = header[xIndex][yIndex];
header[xIndex][yIndex] = i;
}
for (int i = 0; i < numberOfCells; i++){ //only need to check with 4 cells. Uses PBC
for (int j = 0; j < numberOfCells; j++){
int tempIndex = header[i][j];
while (tempIndex > -1){ //fill vector with particles in central cell
centralCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
//fill vector with particles in 4 of the neighboring cells. The '%'s are used for periodic boundary conditions.
tempIndex = header[(i+1) % numberOfCells][j];
while (tempIndex > -1){
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
tempIndex = header[(i+1) % numberOfCells][(j+1) % numberOfCells];
while (tempIndex > -1){
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
tempIndex = header[i][(j+1) % numberOfCells];
while (tempIndex > -1){
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
tempIndex = header[(i+1) % numberOfCells][((j-1) % numberOfCells + numberOfCells) % numberOfCells];
while (tempIndex > -1){
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
//calculate distance between particles in central cell + particles in neighboring cell. if smaller than interaction range, add to neighbors
for (unsigned int k = 0; k < centralCell.size(); k++){
neighborCount[centralCell[k]] +=1;
for (int d = 0; d < dim; d++){
averages[centralCell[k]][d] += unitVectors[centralCell[k]][d];
}
for(unsigned int l = 0; l < k; l++){
if(checkDistance(particleList[centralCell[k]], particleList[centralCell[l]])){
neighborCount[centralCell[k]] +=1;
neighborCount[centralCell[l]] +=1;
for (int d = 0; d < dim; d++){
averages[centralCell[k]][d] += unitVectors[centralCell[l]][d];
averages[centralCell[l]][d] += unitVectors[centralCell[k]][d];
}
}
}
}
for(unsigned int k = 0; k < centralCell.size(); k++){
for(unsigned int l = 0; l < neighboringCell.size(); l++){
if(checkDistance(particleList[centralCell[k]], particleList[neighboringCell[l]])){
neighborCount[centralCell[k]] +=1;
neighborCount[neighboringCell[l]] +=1;
for (int d = 0; d < dim; d++){
averages[centralCell[k]][d] += unitVectors[neighboringCell[l]][d];
averages[neighboringCell[l]][d] += unitVectors[centralCell[k]][d];
}
}
}
}
centralCell.clear();
neighboringCell.clear();
}
}
int numberOfNeighbors;
for(int i = 0; i < numberOfParticles; i++){ //calculate social force + friction + noise
for (int k = 0; k < dim; k++){
forces[i][k] = noise[i][k] - particleList[i].vel[k]; //stochastic + friction
}
if(neighborCount[i] != 0){
numberOfNeighbors = neighborCount[i];
for(int k = 0; k < dim; k++){
dotProduct += averages[i][k]/numberOfNeighbors * unitVectors[i][k];
}
for (int k = 0; k < dim; k++){
forces[i][k] += couplingFactor * (averages[i][k]/numberOfNeighbors - unitVectors[i][k] * dotProduct);
}
dotProduct = 0.0;
}
}
return forces;
}至于第一个函数,在我看来,这是我所能得到的最有效的功能。也许还有比我用的“神话”更好的功能?
对于第二个函数,我觉得奇怪的是,我在malloc上花费了这么多时间,而且是免费的。有没有一种简单的方法可以减少这些电话的数量?还有其他方法可以加快速度吗?
最后,我想我需要并行化这些方法来获得我想要的结果。在这里使用像openMP或CUDA这样的东西会更有效吗?
发布于 2017-03-26 02:07:42
如果没有所有相关的类,很难确定,但是这两个嵌套循环块用来计算粒子之间的距离,对我来说绝对是难闻的。
在我看来,他们都在做几乎相同的事情,你应该能找到一种方法把它们组合成一个嵌套的循环块,也许在一个循环中有多个计数器,在一个循环中运行集中式单元格和邻接单元语句就可以了?
关于向量,你正在创建其中的7个,其中4个是2D的。我肯定会尝试将其中一些数据合并到一个补充类中,以减少您正在创建的向量的数量。
发布于 2017-03-26 22:43:04
在user111832的帮助下,我将代码更改为:
我已经添加了一个向量类(目前仅为2D,稍后将添加3D )
class Vector2D{
public:
double _x;
double _y;
Vector2D(){ _x = 0; _y = 0; }
Vector2D(double x, double y){ _x = x; _y = y; }
double norm() const
{
return sqrt(_x*_x + _y*_y);
}
Vector2D& operator+=(Vector2D v){
_x += v._x;
_y += v._y;
return *this;
}
Vector2D& operator-=(Vector2D v){
_x -= v._x;
_y -= v._y;
return *this;
}
Vector2D& operator*=(double s) {
_x *= s;
_y *= s;
return *this;
}
Vector2D& operator/=(double s) {
_x /= s;
_y /= s;
return *this;
}
};
inline Vector2D operator+(Vector2D a, Vector2D b) { return a += b; }
inline Vector2D operator-(Vector2D a, Vector2D b) { return a -= b; }
inline Vector2D operator*(Vector2D a, double s) { return a *= s; }
inline Vector2D operator*(Vector2D a, Vector2D b){ return a._x * b._x + a._y * b._y; } //inner product of two vectors
inline Vector2D operator*(double s, Vector2D b) { return b *= s; }
inline Vector2D operator/(Vector2D a, double s) { return a /= s; }
inline Vector2D normalize(Vector2D a){ return a / a.norm(); }使用此方法,我将粒子类更改为容器类。
class Particle {
public:
Vector2D pos;
Vector2D vel;
};然后,我的距离检查被更改为:
bool checkDistance(Particle& p1, Particle& p2){
double dist = 0, relDist;
relDist = p1.pos._x - p2.pos._x;
if (fabs(relDist) > systemSize / 2){ //max separation in one dimension is half of system size
if (relDist < 0){
relDist += systemSize;
}
else {
relDist -= systemSize;
}
}
dist += relDist * relDist;
relDist = p1.pos._y - p2.pos._y;
if (fabs(relDist) > systemSize / 2){ //max separation in one dimension is half of system size
if (relDist < 0){
relDist += systemSize;
}
else {
relDist -= systemSize;
}
}
dist += relDist * relDist;
return (dist < 1);
}和力的计算
vector<Vector2D> System::calcForces(vector<Vector2D>& noise){
vector<Vector2D> forces (numberOfParticles); //contains the force working on every particle
vector<Vector2D> averages(numberOfParticles); //contains average velocities of neighboring particles
/* creates neighborlist with linked cell method*/
int numberOfCells = systemSize; //number of cells in one dimension. This way, all particles within interactionRange from a certain particle are in the same cell and in the neighboring cells.
vector<vector<int> > header(numberOfCells, vector<int> (numberOfCells, -1)); //contains highest particle index for each cell, -1 if no particle -> need better option for this
vector<int> link(numberOfParticles, -1); //gives index of particle in same cell; -1 if no particle
vector<int> centralCell, neighboringCell;
centralCell.reserve(numberOfParticles/(systemSize*systemSize) *2);
neighboringCell.reserve(numberOfParticles/(systemSize*systemSize) *2* 4);
vector<Vector2D> unitVectors (numberOfParticles); //create unit vector of velocities for all particles
for(int i = 0; i < numberOfParticles; i++){
unitVectors[i] = normalize(particleList[i].vel);
}
vector<int> neighborCount(numberOfParticles,0);
for (int i = 0; i < numberOfParticles; i++){ //fill header and link
int xIndex = particleList[i].pos._x;
int yIndex = particleList[i].pos._y;
link[i] = header[xIndex][yIndex];
header[xIndex][yIndex] = i;
}
for (int i = 0; i < numberOfCells; i++){ //only need to check with 4 cells. Uses PBC
for (int j = 0; j < numberOfCells; j++){
int tempIndex = header[i][j];
while (tempIndex > -1){ //fill vector with particles in central cell
centralCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
int neighborIndexes[4] = {
header[(i + 1) % numberOfCells][j],
header[i][(j + 1) % numberOfCells],
header[(i + 1) % numberOfCells][(j + 1) % numberOfCells],
header[(i + 1) % numberOfCells][((j - 1) % numberOfCells + numberOfCells) % numberOfCells]
};
for (int k = 0; k < 4; k++)
{
int tempIndex = neighborIndexes[k];
while (tempIndex > -1) {
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
}
int index1, index2;
//calculate distance between particles in central cell + particles in neighboring cell. if smaller than interaction range, add to neighbors
for (unsigned int k = 0; k < centralCell.size(); k++){
index1 = centralCell[k];
neighborCount[index1] +=1;
averages[index1] += unitVectors[index1];
for(unsigned int l = 0; l < k; l++){
index2 = centralCell[l];
if(checkDistance(particleList[index1], particleList[index2])){
neighborCount[index1] +=1;
neighborCount[index2] +=1;
averages[index1] += unitVectors[index2];
averages[index2] += unitVectors[index1];
}
}
}
for(unsigned int k = 0; k < centralCell.size(); k++){
index1 = centralCell[k];
for(unsigned int l = 0; l < neighboringCell.size(); l++){
index2 = neighboringCell[l];
if(checkDistance(particleList[index1], particleList[index2])){
neighborCount[index1] +=1;
neighborCount[index2] +=1;
averages[index1] += unitVectors[index2];
averages[index1] += unitVectors[index2];
}
}
}
centralCell.clear();
neighboringCell.clear();
}
}
double dotProduct = 0.0;
for(int i = 0; i < numberOfParticles; i++){ //calculate social force + friction + noise
forces[i] = noise[i]- particleList[i].vel; //stochastic + friction
averages[i] /= neighborCount[i] ;
dotProduct = averages[i] * unitVectors[i];
forces[i] += couplingFactor * (averages[i]- unitVectors[i] * dotProduct);
}
}
return forces;
}从表面上看,这要容易得多(虽然,我确信,这里还有很多工作可以做),并且将允许更容易地实现并行化。我想拿出一种方法来消除我现在使用的2D数组的“标题”,但我想这不是问这些问题的论坛。
一些实验表明,我的计算速度是现在的两倍。非常感谢大家!如有任何事项尚不清楚或需要更改,请通知我。
https://codereview.stackexchange.com/questions/158843
复制相似问题