这是一种主从关系。如何让主进程以非阻塞的方式搜索传输给它的消息。如果在搜索时没有消息传输到master,它将继续迭代。但是,如果有消息传输给他,它将处理该消息,然后继续迭代。查看/*内部的评论*/
int main(int argc, char *argv[])
{
int numprocs,
rank;
MPI_Request request;
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
if(rank == 0) // the searching process
{
for (int i=0; i < 4000000; i++)
{
// do some stuff here; does not matter what
/* see if any message has been transmitted to me at this point
without blocking the process; if at this time it happens to be
transmitted, do something and than continue with for iternations;
or just continue with for iterations and maybe next time will
have a message which sends me to do something */
}
}
else
{
int flag = 1;
while(flag)
{
// something done that at some point changes flag
}
// send a message to process with rank 0 and don't get stuck here
MPI_Isend(12, 1, MPI_INT, 0, 100, MPI_COMM_WORLD, &request);
// some other stuff done
// wait for message to be transmitted
MPI_Wait(&request, &status);
}
MPI_Finalize();
return 0;
}发布于 2013-11-16 20:43:00
可用消息的非阻塞测试是使用MPI_Iprobe调用完成的。在您的示例中,它将如下所示:
int available;
MPI_Status status;
if(rank == 0) // the searching process
{
for (int i=0; i < 4000000; i++)
{
// do some stuff here; does not matter what
/* see if any message has been transmitted to me at this point
without blocking the process; if at this time it happens to be
transmitted, do something and than continue with for iternations;
or just continue with for iterations and maybe next time will
have a message which sends me to do something */
// Tag value 100 matches the value used in the send operation
MPI_Iprobe(MPI_ANY_SOURCE, 100, MPI_COMM_WORLD, &available, &status);
if (available)
{
// Message source rank is now available in status.MPI_SOURCE
// Receive the message
MPI_Recv(..., status.MPI_SOURCE, status.MPI_TAG, MPI_COMM_WORLD, &status);
}
}
}MPI_ANY_SOURCE用作通配符等级,即指示MPI_Irecv检查来自任何来源的消息。如果发布了相应的发送,则available将设置为true,否则将设置为false。消息的实际来源也会写入status对象的MPI_SOURCE字段。如果available标志指示匹配消息的可用性,则应发布接收操作以接收该消息。重要的是,在接收操作中明确指定秩和标签,否则可能会接收到不同的消息。
您还可以使用持久连接。这些操作的行为非常类似于非阻塞操作,但重要的区别在于它们可以多次重启。具有持久连接的相同代码将如下所示:
if(rank == 0) // the searching process
{
MPI_Request req;
MPI_Status status;
int completed;
// Prepare the persistent connection request
MPI_Recv_init(buffer, buf_size, buf_type,
MPI_ANY_SOURCE, 100, MPI_COMM_WORLD, &req);
// Make the request active
MPI_Start(&req);
for (int i=0; i < 4000000; i++)
{
// do some stuff here; does not matter what
/* see if any message has been transmitted to me at this point
without blocking the process; if at this time it happens to be
transmitted, do something and than continue with for iternations;
or just continue with for iterations and maybe next time will
have a message which sends me to do something */
// Non-blocking Test for request completion
MPI_Test(&req, &completed, &status);
if (completed)
{
// Message is now in buffer
// Process the message
// ...
// Activate the request again
MPI_Start(&req);
}
}
// Cancel and free the request
MPI_Cancel(&req);
MPI_Request_free(&req);
}持久化操作在性能上比非持久化操作略有优势,如前面的代码示例所示。重要的是,当请求处于活动状态时,不能访问buffer,即在调用MPI_Start之后和MPI_Test发出完成信号之前。持久的发送/接收操作也与非持久的接收/发送操作相匹配,因此不需要更改工作进程的代码,它们仍然可以使用MPI_Isend。
发布于 2013-11-16 21:02:01
一种解决方案是使用MPI_IProbe()来测试消息是否正在等待。
MPI_Isend(12,1,MPI_INT,0,100,MPI_COMM_WORLD,&request);
while( flag !=0) { //在某一时刻更改了flag的操作}
代码如下:
#include "mpi.h"
#include "stdio.h"
int main(int argc, char *argv[])
{
int numprocs,
rank;
MPI_Request request;
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
if(rank == 0) // the searching process
{
int i;
for (i=0; i < 4000000; i++)
{
// do some stuff here; does not matter what
//printf("I am still running!\n");
int flag;
MPI_Iprobe(MPI_ANY_SOURCE,100,MPI_COMM_WORLD,&flag,&status);
if(flag!=0){
int value;
MPI_Recv(&value, 1, MPI_INT, status.MPI_SOURCE, status.MPI_TAG, MPI_COMM_WORLD, &status);
printf("I (0) received %d \n",value);
}
/* see if any message has been transmitted to me at this point
without blocking the process; if at this time it happens to be
transmitted, do something and than continue with for iternations;
or just continue with for iterations and maybe next time will
have a message which sends me to do something */
}
}
else
{
int i;
for(i=0;i<42;i++){
int flag = 1;
while(flag!=0)
{
// something done that at some point changes flag
flag=0;
}
int bla=1000*rank+i;
// send a message to process with rank 0 and don't get stuck here
MPI_Isend(&bla, 1, MPI_INT, 0, 100, MPI_COMM_WORLD, &request);
// some other stuff done
printf("I (%d) do something\n",rank);
// wait for message to be transmitted
MPI_Wait(&request, &status);
}
}
MPI_Finalize();
return 0;
}再见,
弗朗西斯
https://stackoverflow.com/questions/20017814
复制相似问题