首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用zlib压缩的boost socket iostreams回波服务器休眠,直到连接被关闭。

使用zlib压缩的boost socket iostreams回波服务器休眠,直到连接被关闭。
EN

Stack Overflow用户
提问于 2021-01-18 10:21:11
回答 2查看 304关注 0票数 2

我尝试使用zlib压缩创建简单的回显服务器,如示例所示。

我的想法是现在发送一些字符串,因为我可以在发送之前将POD类型转换为string (std::string(reinterpret_cast<const char *>(&pod), sizeof(pod))),然后我将确保传输层正常工作。

这里有个问题。客户端压缩数据,发送数据,并表示数据已发送,但服务器在读取数据时被阻塞。我不明白为什么会这样。

我尝试将operator<<out.flush()结合使用,也尝试使用boost::iostreams::copy()。结果是一样的。代码示例是(我根据args为服务器和客户端使用相同的源文件):

代码语言:javascript
复制
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

void receive()
{
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::stringstream buffer;

    std::cout << "start session" << std::endl;

    try
    {
        for (;;)
        {
            {
                boost::iostreams::filtering_istream in;
                in.push(boost::iostreams::zlib_decompressor());
                in.push(stream);

                std::cout << "start reading" << std::endl;

                // looks like server is blocked here
                boost::iostreams::copy(in, buffer);
            }

            std::cout << "data: " << buffer.str() << std::endl;

            {
                boost::iostreams::filtering_ostream out;
                out.push(boost::iostreams::zlib_compressor());
                out.push(stream);

                boost::iostreams::copy(buffer, out);
            }

            std::cout << "Reply is sended" << std::endl;
        }
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << e.error() << '\n';
        stream.close();
    }
}

void send(const std::string &data)
{
    tcp::endpoint ep(ip::address::from_string(host), port);
    
    tcp::iostream stream;
    stream.connect(ep);

    std::stringstream buffer;
    buffer << data;

    if (!stream)
    {
        std::cerr << "Cannot connect to " << host << ":" << port << std::endl;
        return;
    }

    try
    {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(stream);

            out << buffer.str();
            out.flush();
        }

        std::cout << "sended: " << data << std::endl;
        buffer.str("");

        {
            boost::iostreams::filtering_istream in;
            in.push(boost::iostreams::zlib_decompressor());
            in.push(stream);

            // looks like client is blocked here
            boost::iostreams::copy(in, buffer);
        }

        std::cout << "result: " << buffer.str() << std::endl;
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << '\n';
    }
}

int main(int argc, const char *argv[])
{
    if (argc > 1 && argv[1] ==  std::string("sender"))
        send("hello world");
    else
        receive();

    return 0;
}

首先启动服务器,然后启动客户端。产生了下列产出:

服务器

代码语言:javascript
复制
$ ./example
# now it waits while client will be accepted
start session
start reading

客户端

代码语言:javascript
复制
$ ./example sender
sended: hello world

程序会被上面的输出阻塞。我猜服务器仍然在等待来自客户端的数据,并且它不知道客户机发送了它所拥有的所有信息。

如果我用Ctrl + C关闭客户端,则输出如下:

代码语言:javascript
复制
$ ./example
# now it waits while client will be accepted
start session
start reading
# now it is blocked until I press Ctrl + C
data: hello world
Reply is sended
start reading
zlib error-5

代码语言:javascript
复制
$ ./example sender
sended: hello world
^C

我猜zlib error-5是因为服务器认为存档是不完整的。

预期的行为没有阻塞。启动客户端时,该消息必须出现在服务器程序输出中。

为什么程序在阅读时被屏蔽了?我怎么才能修好它?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-01-18 14:56:05

iostreams::copy就是这样做的:它复制流。

恭维你的代码。它非常可读的:)它让我想起了这个答案,用boost iostream套接字读写文件。主要的区别是,这个答案发送一个压缩的blob并关闭。

您是“正确的”,解压缩器知道何时一个压缩块是完整的,但它并不决定另一个将不会跟随。

所以你需要增加框架。传统的方法是通过一个长度的带外.通过使用IO操作器,我实现了这些更改,同时也减少了代码重复。

代码语言:javascript
复制
template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) ;
    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) ;
};

代码语言:javascript
复制
template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) ;
    friend std::istream& operator>>(std::istream& is, ZLIB z) ;
};

ZLIB机械手

此代码主要封装在发送方/接收方之间复制的代码:

代码语言:javascript
复制
template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(os);
            out << z.data << std::flush;
        }
        return os.flush();
    }

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;
        in.push(boost::iostreams::zlib_decompressor());
        in.push(is);

        std::ostringstream oss;
        copy(in, oss);
        z.data = oss.str();

        return is;
    }
};

我制作了T模板,这样它就可以根据需要存储std::string&std::string const&

LengthPrefixed机械手

这个机械手并不关心正在序列化的是什么,只是简单地在其前缀加上线路上的有效长度:

代码语言:javascript
复制
template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
    }

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (is.read(on_the_wire.data(), on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
            }
        }
        return is;
    }
};

我们添加了一个微妙之处:通过存储一个引用或值,取决于我们使用的是什么构造,我们还可以接受临时人员(比如ZLIB机械手):

代码语言:javascript
复制
template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

我没有想过让ZLIB机械手同样通用。所以我把它作为驱魔仪式留给读者

演示程序

将这两者结合起来,您可以简单地将发送方/接收方写成:

代码语言:javascript
复制
void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
    }
}

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;
}

实际上,它打印:

代码语言:javascript
复制
reader: start session
sender: Writing length 19
reader: Reading length 19
sender: sent: "hello world"
reader: data: "hello world"
reader: Writing length 19
sender: Reading length 19
sender: result: "hello world"

完整上市

代码语言:javascript
复制
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <iomanip>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

#ifdef DEBUG
    std::ostream debug(std::cerr.rdbuf());
#else
    std::ostream debug(nullptr);
#endif

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
    }

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (is.read(on_the_wire.data(), on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
            }
        }
        return is;
    }
};

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(os);
            out << z.data << std::flush;
        }
        return os.flush();
    }

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;
        in.push(boost::iostreams::zlib_decompressor());
        in.push(is);

        std::ostringstream oss;
        copy(in, oss);
        z.data = oss.str();

        return is;
    }
};

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
    }
}

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;
}

int main(int argc, const char**) {
    try {
        if (argc > 1)
            client("hello world");
        else
            server();
    } catch (const std::exception& e) {
        std::cerr << e.what() << '\n';
    }
}
票数 1
EN

Stack Overflow用户

发布于 2021-01-18 14:50:39

使用boost::serialization解决问题的步骤如下:

  1. 首先,我移动到像这样的函数:
代码语言:javascript
复制
namespace io = boost::iostreams;

namespace my {
std::string compress(const std::string &data)
{
    std::stringstream input, output;

    input << data;

    io::filtering_ostream io_out;
    io_out.push(io::zlib_compressor());
    io_out.push(output);

    io::copy(input, io_out);

    return output.str();
}

std::string decompress(const std::string &data)
{
    std::stringstream input, output;

    input << data;

    io::filtering_istream io_in;
    io_in.push(io::zlib_decompressor());
    io_in.push(input);

    io::copy(io_in, output);

    return output.str();
}
} // namespace my
  1. 然后,我为字符串缓冲区创建了一个包装器,如下所示(遵循文档的教程):
代码语言:javascript
复制
class Package
{
public:
    Package(const std::string &buffer) : buffer(buffer) {}

private:
    std::string buffer;

    friend class boost::serialization::access;

    template<class Archive>
    void serialize(Archive &ar, const unsigned int)
    {
        ar & buffer;
    }

};
  1. 最后,在阅读和发送之前,我附加了序列化。
代码语言:javascript
复制
/**
 * receiver
 */
Package request;

{
    boost::archive::text_iarchive ia(*stream);
    ia >> request;
}

std::string data = my::decompress(request.buffer);

// do something with data

Package response(my::compress(data));

{
    boost::archive::text_oarchive oa(*stream);
    oa << response;
}

/**
 * sender
 */
std::string data = "hello world";
Package package(my::compress(data));

// send request
{
    boost::archive::text_oarchive oa(*m_stream);
    oa << package;
}

// waiting for a response
{
    boost::archive::text_iarchive ia(*m_stream);
    ia >> package;
}

// decompress response buffer
result = my::decompress(package.get_buffer());
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65772623

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档