我正在为Rpi平台工作一个(个人)轻量级的分布式项目。实现分布式需要节点之间的P2P通信,我认为这是学习套接字和UDP的好时机。
最终的目标是达到节点可以做诸如领导人选举之类的事情,并通过消息在它们之间分配任务。
下面的代码使用pthreads打开两个套接字;一个是发送,另一个是接收。接收套接字现在只打印它收到的消息。
我在两个码头容器上编译并运行了这段代码,它可以工作。我想知道以下几点:
因为我可以获得发件人的主机名/IP,所以我可以有一个简单的开关/if-else,并根据发件人采取行动。
什么消息大小足够小以致于多个发送者的数据包不混合?
也是关于下面一句
网络堆栈将为您缓冲(有限数量的)传入UDP数据包,这样(假设您相对及时地调用recv() ),任何传入的数据包都不会丢失。
下面的代码是否满足这一要求(当然,假设我已经对Hostname/IP添加了一个检查)?
若否,应如何修改?在线程中使用select/poll会更好吗?
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#define BUF_SIZE 500
struct host
{
char *node;
char *port;
char *msg;
};
int
create_socket(char *host, char *port)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int sfd;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
hints.ai_protocol = 0; /* Any protocol */
if (host != NULL)
{
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
} else {
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;
}
int s = getaddrinfo(host, port, &hints, &result);
if (s != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
exit(EXIT_FAILURE);
}
/* getaddrinfo() returns a list of address structures.
Try each address until we successfully connect(2).
If socket(2) (or connect(2)/bind(2)) fails, we (close the socket
and) try the next address. */
for (rp = result; rp != NULL; rp = rp->ai_next) {
// printf("this is the socketaddr data %s\n", rp->ai_addr->sa_data[]);
sfd = socket(rp->ai_family, rp->ai_socktype,
rp->ai_protocol);
if (sfd == -1)
continue;
if (host == NULL)
{
printf("%s\n", "host is null");
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) /* server socket */
break; /* Success */
}
else
{
if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1) /* client socket */
break; /* Success */
}
close(sfd);
}
if (rp == NULL) { /* No address succeeded */
fprintf(stderr, "Could not bind/connect\n");
exit(EXIT_FAILURE);
}
freeaddrinfo(result); /* No longer needed */
return sfd;
}
void*
send_msg(void *arg)
{
size_t len;
struct host *receiver = (struct host *) arg;
int sfd = create_socket(receiver->node, receiver->port);
printf("sending sfd = %d\n", sfd);
len = strlen(receiver->msg) + 1;
/* +1 for terminating null byte */
if (len + 1 > BUF_SIZE) {
fprintf(stderr,
"Ignoring long message\n");
}
for(;;)
{
printf("sending message %s\n", receiver->msg);
if (write(sfd, receiver->msg, len) != len) {
fprintf(stderr, "partial/failed write\n");
close(sfd);
sleep(1);
sfd = create_socket(receiver->node, receiver->port);
}
sleep(5);
}
pthread_exit(NULL);
}
void*
recv_msg(void *arg)
{
int s;
ssize_t nread;
char buf[BUF_SIZE];
socklen_t peer_addr_len;
struct sockaddr_storage peer_addr;
struct host *me = (struct host *) arg;
int sfd = create_socket(me->node, me->port);
printf("receive sfd = %d\n", sfd);
for (;;) {
printf("ready to receive message\n");
peer_addr_len = sizeof(struct sockaddr_storage);
nread = recvfrom(sfd, buf, BUF_SIZE, 0,
(struct sockaddr *) &peer_addr, &peer_addr_len);
printf(">>> %d\n", nread);
if (nread == -1)
continue; /* Ignore failed request */
char host[NI_MAXHOST], service[NI_MAXSERV];
s = getnameinfo((struct sockaddr *) &peer_addr,
peer_addr_len, host, NI_MAXHOST,
service, NI_MAXSERV, NI_NUMERICSERV);
if (s == 0)
printf("Received %zd bytes from %s:%s\n",
nread, host, service);
else
fprintf(stderr, "getnameinfo: %s\n", gai_strerror(s));
}
pthread_exit(NULL);
}
void
create_threads(struct host *me,struct host *receiver)
{
pthread_t tids[2];
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_create(&tids[0], &attr, send_msg, receiver);
pthread_create(&tids[1], &attr, recv_msg, me);
pthread_join(tids[0], NULL);
pthread_join(tids[1], NULL);
}
int
main(int argc, char *argv[])
{
struct host me;
struct host receiver;
if (argc < 3) {
fprintf(stderr, "Usage: %s host port msg...\n", argv[0]);
exit(EXIT_FAILURE);
}
receiver.node = argv[1];
receiver.port = argv[2];
receiver.msg = argv[3];
me.node = NULL;
me.port = argv[2];
create_threads(&me, &receiver);
exit(EXIT_SUCCESS);
}发布于 2018-06-05 16:19:22
下面的代码是否满足以下(?)
小评论:次要问题。
if (len + 1 > BUF_SIZE) {看上去一败涂地,方向不佳.我认为if (len > BUF_SIZE) {是因为len已经包含了空字符的空间。ssize_t是int。备选方案: ssize_t nread;nread = recvfrom(sfd,buf,BUF_SIZE,0,(struct sockaddr *) &peer_addr,&peer_addr_len);// printf(">>> %d\n",nread);printf(">>> %lld\n",(长) nread);/或printf(">>> %jd\n",(intmax_t) nread);https://codereview.stackexchange.com/questions/195766
复制相似问题