首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >顺序通信失败,boost::asio::streambuf以二进制方式?

顺序通信失败,boost::asio::streambuf以二进制方式?
EN

Stack Overflow用户
提问于 2018-02-28 16:04:21
回答 1查看 235关注 0票数 1

使用boost::asio,我正在编写网络内容。我试着建立一个简单的发送和接收字符串协议。发送方首先将字符串大小发送给接收方。然后发送方将实际的字符串发送给接收方。

特别是,我设计了以下两个协议。

  • 持有字符串的发送方将其发送给接收方。接收到它时,接收方会显示字符串。
  • 按顺序执行上述协议(两次)。

我构建了上面的协议,如下所示:如果我执行该协议一次,那就很好了。但是,如果我多次执行该协议(例如两次),接收方接收的字符串大小就会出错。

第一次: 1365字节。

第二次: 779073字节。(只需阅读不是779073,而是7790)

我发现os << data_size不是以二进制方式完成的。"779073“只是作为6个字节的字符串发送。但是接收器只读取其中的4字节。如何发送二进制数据并使用boost::asio和boost::asio::streambuf接收二进制数据?

接收机

代码语言:javascript
复制
// socket is already defined
// ** first step: recv data size
boost::asio::streambuf buf;
boost::asio::read(
   socket,
   buf, 
   boost::asio::transfer_exactly(sizeof(uint32_t))
);
std::istream iss(&buf);
uint32_t read_len;
iss >>  read_len;

// ** second step: recv payload based on the data size
boost::asio::streambuf buf2;
read_len = boost::asio::read(socket, buf2, 
boost::asio::transfer_exactly(read_len), error);
cout << "  read "<< read_len << " bytes payload" << endl; 
std::istream is_payload(&buf2);
std::string str;
is_payload >> str;
cout << str << endl; 

发件人

代码语言:javascript
复制
// socket is already defined
string str=...;   // some string to be sent
// ** first step: tell the string size to the reciever
uint32_t data_size = str.size();
boost::asio::streambuf send_buf;
std::ostream os(&send_buf);
os << data_size;
size_t sent_byte = boost::asio::write(socket, send_buf.data());
cout << sent_byte << endl; // debug purpose

// ** second step: send the actual string (payload)
sent_byte = boost::asio::write(socket, boost::asio::buffer(reinterpret_cast<const char*>(&str[0]), data_size));
cout << sent_byte << endl; // debug purpose
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-03-01 03:42:50

您可以发送大小二进制,但这要求您考虑设备和操作系统之间的体系结构差异。

下面是我对实际编码协议的看法:

代码语言:javascript
复制
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>

namespace ba = boost::asio;
using ba::ip::tcp;
using error_code = boost::system::error_code;

namespace Protocol { // your library

    using net_size_t = boost::endian::big_int32_t; // This protocol uses Big-endian network byte order

    template <typename Derived, typename Token, typename Sig = void(error_code, size_t)> 
    struct base_async_op : std::enable_shared_from_this<Derived> {
        using base_type = base_async_op<Derived, Token, Sig>;

        template <typename DeducedToken>
        base_async_op(DeducedToken &&token) : _token(std::forward<DeducedToken>(token)) {}

        using _Token   = std::decay_t<Token>;
        using _Init    = ba::async_completion<_Token, Sig>;
        using _Handler = typename _Init::completion_handler_type;

        _Token _token;
        _Init _init {_token};

        auto get_allocator() const noexcept { 
            return (boost::asio::get_associated_allocator)(_init.completion_handler);
        }
        using executor_type = ba::associated_executor_t<_Handler>;
        executor_type get_executor() const noexcept {
            return (boost::asio::get_associated_executor)(_init.completion_handler);
        }

        Derived& derived()             { return static_cast<Derived&>(*this);       } 
        Derived const& derived() const { return static_cast<Derived const&>(*this); } 

        template <typename F>
        auto wrap(F&& f) const {
            //std::cout << "WRAP: " << typeid(derived().get_executor()).name() << "\n";
            return ba::bind_executor(derived().get_executor(), std::forward<F>(f));
        }
    };

    template <typename Derived, typename Stream, typename Token, typename Sig = void(error_code, size_t)> 
    struct stream_async_op : base_async_op<Derived, Token, Sig> {
        using base_type = stream_async_op<Derived, Stream, Token, Sig>;

        template <typename DeducedToken>
        stream_async_op(Stream& s, DeducedToken &&token) : base_async_op<Derived, Token, Sig>(std::forward<DeducedToken>(token)), _stream(s)  {}

        Stream& _stream;

        using executor_type = ba::associated_executor_t<typename stream_async_op::_Handler, decltype(std::declval<Stream>().get_executor())>;
        executor_type get_executor() const noexcept {
            return (boost::asio::get_associated_executor)(this->_init.completion_handler, _stream.get_executor());
        }
    };

    template <typename AsyncStream, typename Buffer, typename Token>
    auto async_transmit(AsyncStream& s, Buffer message_buffer, Token&& token) {

        struct op : stream_async_op<op, AsyncStream, Token> {
            using op::base_type::base_type;
            using op::base_type::_init;
            using op::base_type::_stream;

            net_size_t _length[1];

            auto run(Buffer buffer) {
                auto self = this->shared_from_this();
                _length[0] = ba::buffer_size(buffer);

                ba::async_write(_stream, std::vector<ba::const_buffer> { ba::buffer(_length), buffer },
                    this->wrap([self,this](error_code ec, size_t transferred) { _init.completion_handler(ec, transferred); }));

                return _init.result.get();
            }
        };

        return std::make_shared<op>(s, std::forward<Token>(token))->run(message_buffer);
    }

    template <typename AsyncStream, typename Buffer, typename Token>
    auto async_receive(AsyncStream& s, Buffer& output, Token&& token) {

        struct op : stream_async_op<op, AsyncStream, Token> {
            using op::base_type::base_type;
            using op::base_type::_init;
            using op::base_type::_stream;

            net_size_t _length[1] = {0};

            auto run(Buffer& output) {
                auto self = this->shared_from_this();

                ba::async_read(_stream, ba::buffer(_length), this->wrap([self, this, &output](error_code ec, size_t transferred) {
                    if (ec)
                        _init.completion_handler(ec, transferred);
                    else
                        ba::async_read(_stream, ba::dynamic_buffer(output), ba::transfer_exactly(_length[0]),
                            this->wrap([self, this](error_code ec, size_t transferred) { 
                                _init.completion_handler(ec, transferred);
                            }));
                }));

                return _init.result.get();
            }
        };

        return std::make_shared<op>(s, std::forward<Token>(token))->run(output);
    }

    template <typename Output = std::string, typename AsyncStream, typename Token>
    auto async_receive(AsyncStream& s, Token&& token) {

        struct op : stream_async_op<op, AsyncStream, Token, void(error_code, Output)> {
            using op::base_type::base_type;
            using op::base_type::_init;
            using op::base_type::_stream;

            Output _output;
            net_size_t _length[1] = {0};

            auto run() {
                auto self = this->shared_from_this();

                ba::async_read(_stream, ba::buffer(_length), [self,this](error_code ec, size_t) {
                        if (ec)
                            _init.completion_handler(ec, std::move(_output));
                        else
                            ba::async_read(_stream, ba::dynamic_buffer(_output), ba::transfer_exactly(_length[0]),
                                [self,this](error_code ec, size_t) { _init.completion_handler(ec, std::move(_output)); });
                    });

                return _init.result.get();
            }
        };

        return std::make_shared<op>(s, std::forward<Token>(token))->run();
    }

} // Protocol

#include <iostream>
#include <iomanip>

int main() {

    ba::io_context io;
    tcp::socket sock(io);
    sock.connect({tcp::v4(), 6767});

    auto cont = [](auto name, auto continuation = []{}) { return [=](error_code ec, size_t transferred) {
        std::cout << name << " completed (" << transferred << ", " << ec.message() << ")\n";
        if (!ec) continuation();
    }; };
    auto report = [=](auto name) { return cont(name, []{}); };

    // send chain
    std::string hello = "Hello", world = "World";
    Protocol::async_transmit(sock, ba::buffer(hello),
            cont("Send hello", [&] { Protocol::async_transmit(sock, ba::buffer(world), report("Send world")); }
        ));
#ifndef DEMO_USE_FUTURE
    // receive chain
    std::string object1, object2;
    Protocol::async_receive(sock, object1,
            cont("Read object 1", [&] { Protocol::async_receive(sock, object2, report("Read object 2")); }));

    io.run();

    std::cout << "Response object 1: " << std::quoted(object1) << "\n";
    std::cout << "Response object 2: " << std::quoted(object2) << "\n";
#else
    // also possible, alternative completion mechanisms:
    std::future<std::string> fut = Protocol::async_receive(sock, ba::use_future);
    io.run();

    std::cout << "Response object: " << std::quoted(fut.get()) << "\n";
#endif

}

当与测试服务器交谈时,如下所示:

代码语言:javascript
复制
xxd -p -r <<< '0000 0006 4e6f 2077 6179 0000 0005 4a6f 73c3 a90a' | netcat -l -p 6767 | xxd

程序打印

代码语言:javascript
复制
Send hello completed (9, Success)
Send world completed (9, Success)
Read object 1 completed (6, Success)
Read object 2 completed (5, Success)
Response object 1: "No way"
Response object 2: "José"

而netcat的侧印:

代码语言:javascript
复制
00000000: 0000 0005 4865 6c6c 6f00 0000 0557 6f72  ....Hello....Wor
00000010: 6c64                                     ld

启用处理程序跟踪允许您使用handlerviz.pl可视化调用链:

备注您可以将big_int32_t更改为little_int32_t,而无需进行任何进一步更改。当然,您应该更改服务器端的有效负载以匹配: xxd -p -r <<< '0600 0000 4e6f 2077 6179 0500 0000 4a6f 73c3 a90a‘连网猫-l -p 6767 xxd

使用升压端::ntohs::ntohl::htons::htonl

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49033780

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档