首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >具有多线程应用的Nanomsg非阻塞双向插座

具有多线程应用的Nanomsg非阻塞双向插座
EN

Stack Overflow用户
提问于 2019-09-07 14:28:53
回答 1查看 1.1K关注 0票数 1

我使用Nanomsg在我的系统中使用C++进行IPC。我想要创建一个后台线程来处理消息发送和recv。我使用对范式,并使用nn_poll检查套接字fd是否可写或可读,如果可读,则从消息队列中弹出一项并发送。我的问题是,由于nn_poll循环中没有睡眠,所以我创建的后台线程正在使用更多的CPU,有什么方法可以减少CPU的使用,但仍然使延迟就像没有睡眠一样?下面是我的示例代码。谢谢。

Server.cpp

代码语言:javascript
复制
#include <iostream>
#include <thread>
#include <string>
#include <queue>
#include <utility>
#include <mutex>
#include <nanomsg/pair.h>
#include <nanomsg/nn.h>

class Nanomsg {
private:
    bool _server;
    bool _stop;

    int _sock;
    std::string _url;
    std::thread _th;
    std::queue<std::string> _queue;

    std::mutex _queueMutex;

    void _start() {
        _sock = nn_socket(AF_SP, NN_PAIR);

        if (_sock < 0) {
            std::cout << "failed to create socket" << std::endl;
            return;
        }

        int rc = 0;

        if (_server) {
            rc = nn_bind(_sock, _url.c_str());
        } else {
            rc = nn_connect(_sock, _url.c_str());
        }

        if (rc < 0) {
            std::cout << "failed to connect/bind socket" << std::endl;
            return;
        }

        struct nn_pollfd pfd{};
        pfd.fd = _sock;
        pfd.events = NN_POLLIN | NN_POLLOUT;

        while (!_stop) {
            std::cout << "ssasd" << std::endl;
            rc = nn_poll(&pfd, 1, 2000);

            if (rc == 0) {
                std::cout << "timeout" << std::endl;
                continue;
            }

            if (rc == -1) {
                std::cout << "error!" << std::endl;
                return;
            }

            if (pfd.revents & NN_POLLIN) {
                char *buf = nullptr;
                int rbs = nn_recv(_sock, &buf, NN_MSG, 0);

                if (rbs < 0) {
                    continue;
                }

                std::string r(buf, rbs);

                std::cout << "received [" << r << "]" << std::endl;

                nn_freemsg(buf);
            }

            if (pfd.revents & NN_POLLOUT) {
                std::cout << "asd" << std::endl;
                if (_queue.empty()) {
                    continue;
                }

                {
                    std::lock_guard<std::mutex> lock(_queueMutex);
                    auto msg = _queue.front();

                    std::cout << "send [" << msg << "]" << std::endl;

                    rc = nn_send(_sock, msg.c_str(), msg.length(), 0);
                    if (rc >= 0) {
                        _queue.pop();
                    }
                }
            }
        }

    }

public:
    Nanomsg() : _sock(0), _server(false), _stop(false), _url("ipc:///tmp/test.ipc") {

    }

    Nanomsg(std::string url, bool server) : _url(std::move(url)), _sock(0), _server(server), _stop(false) {

    }

    void start() {
        _th = std::thread([=]() {
            _start();
        });
    }

    void stop() {
        _stop = true;

        if (_th.joinable()) {
            _th.join();
        }
    }

    void send(const std::string& msg) {
        {
            std::lock_guard<std::mutex> lock(_queueMutex);
            _queue.push(msg);
        }
    }

};

int main() {

    Nanomsg server("ipc:///tmp/test.ipc", true);

    server.start();

    while (true) {
        server.send("test");
        std::this_thread::sleep_for(std::chrono::seconds(3));
    }

    return 0;
}

Client.cpp

代码语言:javascript
复制
#include <iostream>
#include <thread>
#include <string>
#include <queue>
#include <utility>
#include <mutex>
#include <nanomsg/pair.h>
#include <nanomsg/nn.h>

struct nn_pollf {
    int fd;
    short events;
    short revents;
};

class Nanomsg {
private:
    bool _server;
    bool _stop;

    int _sock;
    std::string _url;
    std::thread _th;
    std::queue<std::string> _queue;

    std::mutex _queueMutex;

    void _start() {
        _sock = nn_socket(AF_SP, NN_PAIR);

        if (_sock < 0) {
            std::cout << "failed to create socket" << std::endl;
            return;
        }

        int rc = 0;

        if (_server) {
            rc = nn_bind(_sock, _url.c_str());
        } else {
            rc = nn_connect(_sock, _url.c_str());
        }

        if (rc < 0) {
            std::cout << "failed to connect/bind socket" << std::endl;
            return;
        }

        struct nn_pollfd pfd{};
        pfd.fd = _sock;
        pfd.events = NN_POLLIN | NN_POLLOUT;

        while (!_stop) {
            std::cout << "ssasd" << std::endl;
            rc = nn_poll(&pfd, 1, 2000);

            if (rc == 0) {
                std::cout << "timeout" << std::endl;
                continue;
            }

            if (rc == -1) {
                std::cout << "error!" << std::endl;
                return;
            }

            if (pfd.revents & NN_POLLIN) {
                char *buf = nullptr;
                int rbs = nn_recv(_sock, &buf, NN_MSG, 0);

                if (rbs < 0) {
                    continue;
                }

                std::string r(buf, rbs);

                std::cout << "received [" << r << "]" << std::endl;

                nn_freemsg(buf);
            }

            if (pfd.revents & NN_POLLOUT) {
                std::cout << "asd" << std::endl;
                if (_queue.empty()) {
                    continue;
                }

                {
                    std::lock_guard<std::mutex> lock(_queueMutex);
                    auto msg = _queue.front();

                    std::cout << "send [" << msg << "]" << std::endl;

                    rc = nn_send(_sock, msg.c_str(), msg.length(), 0);
                    if (rc >= 0) {
                        _queue.pop();
                    }
                }
            }
        }

    }

public:
    Nanomsg() : _sock(0), _server(false), _stop(false), _url("ipc:///tmp/test.ipc") {

    }

    Nanomsg(std::string url, bool server) : _url(std::move(url)), _sock(0), _server(server), _stop(false) {

    }

    void start() {
        _start();
//        _th = std::thread([=]() {
//            _start();
//        });
    }

    void stop() {
        _stop = true;

        if (_th.joinable()) {
            _th.join();
        }
    }

    void send(const std::string& msg) {
        {
            std::lock_guard<std::mutex> lock(_queueMutex);
            _queue.push(msg);
        }
    }

};

int main() {

    Nanomsg client("ipc:///tmp/test.ipc", false);

    client.start();

    return 0;
}
EN

回答 1

Stack Overflow用户

发布于 2019-09-10 20:37:40

如果没有什么要发送的,也没有什么可接收的,那就让你的线程休眠一毫秒。在你目前的设计中,这是你唯一能做的事。

如果可能的话,您可以使用下一代纳米粒子(nng)并给它的异步接口一个机会。似乎您正在自己实现一个异步接口,那么为什么不也使用nanomsg呢?它们具有操作系统网络API的所有特性,因此应该能够在不浪费CPU时间的情况下提供最佳的延迟。

创建异步I/O句柄,并使用nng_aio_alloc(3)设置回调。打电话给nng_recv_aio(3),以便在接收数据时得到通知。不要管理您自己的发送队列,而是在nng_send_aio(3)中使用void Nanomsg::send()

不幸的是,nng是一个单独的库,您正在使用经典的nanomsg。我注意到只有中文字..。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57834618

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档