双方通过网络进行通信,来回发送多达100条消息。发送的每一条消息都是对最后一条收到的消息的响应,并取决于它。这意味着双方都必须等待反应的到来,然后才能进行计算。速度是非常关键的,当网络有适当的延迟时(由于大量的消息和不可避免的等待响应),速度会很快恶化。消息不是很大,因此带宽似乎并不重要。
也许我想要做的事情可以通过使用一些库来实现。如果是这样的话,请向我指出这一点,最好与演示或链接到如何使用它的资源。由于缺少更好的替代方案,我在这里使用了(POSIX-API) TCP套接字。我决定使用TCP而不是UDP,因为所有数据都必须按照正确的顺序接收,而且数据包头大小不是相关的开销,特别是因为延迟是问题所在。
这是我第一次使用套接字,我肯定犯了很多错误,这都是针对套接字和纯C++的(我使用的是C++17,尽管代码也使用C++11编译)。我的问题似乎非常标准,而且大部分代码都是从一些关于套接字的教程中弄不清楚的,但我很难找到关于最佳实践的详细源代码。
下面是一个简单的演示代码,它说明了我处理TCP逻辑的方式。我试着尽量缩短它,但它仍然很长。一些评论:
tcp_helpers.h声明(并定义此post的简洁性)包含所有TCP逻辑的函数。另外两个代码文件是一个示例应用程序(运行服务器和客户端的主要方法)。在我的实际代码中,我将TCP逻辑封装在类中,这些类内部调用这里显示的函数。有些问题我已经问过了:
send_full_message函数的第一个版本(参见链接的堆栈溢出问题)是对send进行两次sys调用,一次是针对(自定义)头(8字节结构),一次是针对实际消息(结构数组)。在这个版本中,我将其简化为单个sys调用,方法是将头和数据复制到缓冲区中(使用可能丑陋的C风格内存操作)。我没有注意到性能与原始文件(将报头作为单独的数据包发送)有什么不同。哪种方法是可选的?能否更优雅地做到这一点?receive_structs函数中的指针算法看起来很难看。这里最好的解决方案是什么?// tcp_helpers.h.
// NOTE: Requires C++11, tested also with C++17. Using this code in the present form may be ill-advised.
// This is not a true header file (contains all method definitions for brevity).
#include <vector>
#include <iostream>
#include <string>
#include <sstream>
#include <cerrno> // for checking socket error messages
#include <cstdint> // for fixed length integer types
#include <cstring> // for memcpy
#include <unistd.h> // POSIX specific
#include <sys/socket.h> // POSIX specific
#include <netinet/in.h> // POSIX specific
#include <netinet/tcp.h> // POSIX specific
#include <arpa/inet.h> // POSIX specific
//////////////////// PROFILING ///////////////////
#include <chrono>
static auto start = std::chrono::high_resolution_clock::now();
// print a message with timestamp for rudimentary profiling. (I don't actually use this in my code)
void print_now(const std::string &message) {
auto t2 = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> time_span = t2 - start;
std::cout << time_span.count() << ": " << message << std::endl;
}
//////////////////// PROFILING ///////////////////
struct TCPMessageHeader { // Header for each message (I really use this).
uint8_t protocol_name[4];
uint32_t message_bytes;
};
struct ServerSends { // The server sends messages that are arrays of this struct (just an example).
uint16_t a;
uint32_t b;
uint32_t c;
};
typedef uint8_t ClientSends; // The client sends messages that are arrays of this (just an example).
namespace TCP_Helpers {
void disable_nagles_algorithm(int socket_fd) {
const int enable_no_delay = 1; // Disable Nagle's algorithm for TCP socket to improve performance
if (setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY, &enable_no_delay, sizeof(enable_no_delay))) {
throw std::runtime_error("Failed to disble Nagle's algorithm by setting socket options");
}
}
int init_client(const std::string &ip_address, int port) {
int sock_fd;
struct sockaddr_in serv_addr{};
if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
throw std::runtime_error("TCP Socket creation failed\n");
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
// Convert IPv4 address from text to binary form
if (inet_pton(AF_INET, ip_address.c_str(), &serv_addr.sin_addr) <= 0) {
throw std::runtime_error("Invalid address/ Address not supported for TCP connection\n");
}
if (connect(sock_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
throw std::runtime_error("Failed to connect to server.\n");
}
disable_nagles_algorithm(sock_fd);
return sock_fd;
}
int init_server(int port) {
int server_fd;
int new_socket;
struct sockaddr_in address{};
int opt = 1;
int addrlen = sizeof(address);
// Creating socket file descriptor
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
throw std::runtime_error("socket creation failed\n");
}
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
throw std::runtime_error("failed to set socket options");
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(port);
// Forcefully attaching socket to the port
if (bind(server_fd, (struct sockaddr *) &address, sizeof(address)) < 0) {
throw std::runtime_error("bind failed");
}
if (listen(server_fd, 3) < 0) {
throw std::runtime_error("listen failed");
}
if ((new_socket = accept(server_fd, (struct sockaddr *) &address, (socklen_t *) &addrlen)) < 0) {
throw std::runtime_error("accept failed");
}
if (close(server_fd)) // don't need to listen for any more tcp connections (PvP connection).
throw std::runtime_error("closing server socket failed");
disable_nagles_algorithm(new_socket);
return new_socket;
}
template<typename NakedStruct>
void send_full_message(int fd, TCPMessageHeader header_to_send, const std::vector<NakedStruct> &structs_to_send) {
const size_t num_message_bytes = sizeof(NakedStruct) * structs_to_send.size();
if (header_to_send.message_bytes != num_message_bytes) {
throw std::runtime_error("Trying to send struct vector whose size does not"
" match the size claimed by message header");
}
print_now("Begin send_full_message");
// copy header and structs vector contents to new vector (buffer) of bytes and sent via TCP.
// Does not seem to be faster than sending two separate packets for header/message. Can the copy be avoided?
std::vector<uint8_t> full_msg_packet(sizeof(header_to_send) + num_message_bytes);
memcpy(full_msg_packet.data(), &header_to_send, sizeof(header_to_send));
memcpy(full_msg_packet.data() + sizeof(header_to_send), structs_to_send.data(), num_message_bytes);
// maybe need timeout and more error handling?
size_t bytes_to_send = full_msg_packet.size() * sizeof(uint8_t);
int send_retval;
while (bytes_to_send != 0) {
send_retval = send(fd, full_msg_packet.data(), sizeof(uint8_t) * full_msg_packet.size(), 0);
if (send_retval == -1) {
int errsv = errno; // from errno.h
std::stringstream s;
s << "Sending data failed (locally). Errno:" << errsv
<< " while sending header of size" << sizeof(header_to_send)
<< " and data of size " << header_to_send.message_bytes << ".";
throw std::runtime_error(s.str());
}
bytes_to_send -= send_retval;
}
print_now("end send_full_message.");
}
template<typename NakedStruct>
std::vector<NakedStruct> receive_structs(int fd, uint32_t bytes_to_read) {
print_now("Begin receive_structs");
unsigned long num_structs_to_read;
// ensure expected message is non-zero length and a multiple of the SingleBlockParityRequest struct
if (bytes_to_read > 0 && bytes_to_read % sizeof(NakedStruct) == 0) {
num_structs_to_read = bytes_to_read / sizeof(NakedStruct);
} else {
std::stringstream s;
s << "Message length (bytes_to_read = " << bytes_to_read <<
" ) specified in header does not divide into required stuct size (" << sizeof(NakedStruct) << ").";
throw std::runtime_error(s.str());
}
// vector must have size > 0 for the following pointer arithmetic to work
// (this method must check this in above code).
std::vector<NakedStruct> received_data(num_structs_to_read);
int valread;
while (bytes_to_read > 0) // need to include some sort of timeout?!
{
valread = read(fd,
((uint8_t *) (&received_data[0])) +
(num_structs_to_read * sizeof(NakedStruct) - bytes_to_read),
bytes_to_read);
if (valread == -1) {
throw std::runtime_error("Reading from socket file descriptor failed");
} else {
bytes_to_read -= valread;
}
}
print_now("End receive_structs");
return received_data;
}
void send_header(int fd, TCPMessageHeader header_to_send) {
print_now("Start send_header");
int bytes_to_send = sizeof(header_to_send);
int send_retval;
while (bytes_to_send != 0) {
send_retval = send(fd, &header_to_send, sizeof(header_to_send), 0);
if (send_retval == -1) {
int errsv = errno; // from errno.h
std::stringstream s;
s << "Sending data failed (locally). Errno:" << errsv << " while sending (lone) header.";
throw std::runtime_error(s.str());
}
bytes_to_send -= send_retval;
}
print_now("End send_header");
}
TCPMessageHeader receive_header(int fd) {
print_now("Start receive_header (calls receive_structs)");
TCPMessageHeader retval = receive_structs<TCPMessageHeader>(fd, sizeof(TCPMessageHeader)).at(0);
print_now("End receive_header (calls receive_structs)");
return retval;
}
}// main_server.cpp
#include "tcp_helpers.h"
int main() {
int port = 20000;
int socket_fd = TCP_Helpers::init_server(port);
while (true) { // server main loop
TCPMessageHeader rcv_header = TCP_Helpers::receive_header(socket_fd);
if (rcv_header.protocol_name[0] == 0) // using first byte of header name as signal to end
break;
// receive message
auto rcv_message = TCP_Helpers::receive_structs<ClientSends>(socket_fd, rcv_header.message_bytes);
// for (ClientSends ex : rcv_message) // example "use" of the received data that takes a bit of time.
// std::cout << static_cast<int>(ex) << " ";
// std::cout << std::endl << std::endl;
auto bunch_of_zeros = std::vector<ServerSends>(1000); // send a "response" containing 1000 structs of zeros
TCPMessageHeader send_header{"abc", 1000 * sizeof(ServerSends)};
TCP_Helpers::send_full_message(socket_fd, send_header, bunch_of_zeros);
}
exit(EXIT_SUCCESS);
}// main_client.cpp
#include "tcp_helpers.h"
int main() {
// establish connection to server and get socket file descriptor.
int port = 20000;
auto ip = "127.0.0.1";
int socket1_fd = TCP_Helpers::init_client(ip, port);
std::cout << "connected." << std::endl;
for (int i = 0; i < 20; ++i) { // repeat (for runtime statistics) sending and receiving arbitrary data
// send a message containing 500 structs of zeros
auto bunch_of_zeros = std::vector<ClientSends>(500);
TCPMessageHeader send_header{"abc", 500 * sizeof(ClientSends)};
TCP_Helpers::send_full_message(socket1_fd, send_header, bunch_of_zeros);
// receive response
TCPMessageHeader rcv_header = TCP_Helpers::receive_header(socket1_fd);
auto rcv_message = TCP_Helpers::receive_structs<ServerSends>(socket1_fd, rcv_header.message_bytes);
// for (ServerSends ex : rcv_message) // example "use" of the received data that takes a bit of time.
// std::cout << ex.a << ex.b << ex.c << " ";
// std::cout << std::endl << std::endl;
}
auto end_header = TCPMessageHeader{}; // initialized all fields to zero. "end" signal in this demonstration.
TCP_Helpers::send_header(socket1_fd, end_header);
exit(EXIT_SUCCESS);
}发布于 2020-08-31 20:15:35
我同意大多数马丁约克写的东西,除了关于二进制协议的评论。有时发送结构是正确的:它非常快,合理紧凑,不需要转换到或从其他格式,这可能会浪费CPU周期,可能需要大量的外部依赖。但是,除非您预先考虑可扩展性,否则您可以轻松地将自己锁在一组结构中,而不可能优雅地迁移到新版本。您的代码只处理预先知道其大小的结构。您可以考虑添加功能来处理具有可变大小的"structs“。
除了我只想补充以下几点:
还有第三种选择,它只使用一个syscall,不需要复制数据,即使用sendmsg。它允许您指定需要通过套接字发送的不连续内存区域的列表,就好像它是一个连续块一样。它需要更多的代码来设置传递给sendmsg()所需的结构,但其中一些可能可以准备一次,然后再重用。
禁用Nagle是以带宽换取延迟。与其这样做,不如考虑使用TCP_CORK。当应用程序知道它想要发送一组数据,并且希望不延迟地发送数据包,但是最好地使用网络MTU时,它应该在这组数据开始时启用TCP_CORK,并且当它发送所有数据时,它禁用TCP_CORK,然后它将确保发送缓冲区中的任何剩余数据都会立即被发送(假设拥塞窗口允许它)。如果要禁用Nagle,并且希望在一行中发送许多小结构,那么每个结构都将作为一个单独的数据包发送,没有什么好的理由。
https://codereview.stackexchange.com/questions/248701
复制相似问题