首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >利用C++进行自主学习的pub/sub玩具应用

利用C++进行自主学习的pub/sub玩具应用
EN

Code Review用户
提问于 2022-08-01 23:51:35
回答 1查看 449关注 0票数 4

首先,我对冗长的帖子表示歉意。我不确定我应该包括哪些代码。

在c++的实践中,我正在使用c++17为pub/sub编写一个玩具应用程序。为了保持简单,发布服务器和订阅服务器都在同一个OS进程上,因此只需要一个deque来存储消息。

我不使用单例,而是跨构造函数创建一个shared_ptrstd::move

我尝试在我熟悉的任何地方使用c++17成语;例如,在std::scoped_lock vs std::lock_guard和lambda over std::bind中。如果代码中还有其他可以升级的地方,请注意。

我使用c++的主要困难之一是不清楚哪些技术是c++11,哪些是modern

沿着这些思路,你知道谷歌C++风格指南为什么禁止使用c++20吗?尤其是由于c++23正在快速逼近。

第一个类充当Queue,具有回调机制。

代码语言:javascript
复制
#pragma once

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

namespace tareeqav {
    namespace messaging {

        class QueueBase {
        public:
            virtual ~QueueBase() {};
        };

        template
        class Queue : public QueueBase {
        public:
            Queue() {
                std::thread t = std::thread([this]() {
                    this->RunCallbacks();
                });
                t.detach();
            };

            void EnqueueMessage(std::shared_ptr message);

            const int length() { return messages_.size(); };

            template
            void RegisterCallback(CallbackT &&callback);

            void RunCallbacks();

        private:
            std::mutex mutex_;
            std::string id_;
            std::deque> messages_ = {};
            std::vector)>> callbacks_ = {};
            std::chrono::time_point prev = std::chrono::system_clock::now();

            std::shared_ptr DequeueMessage();
        };

        template
        void Queue::EnqueueMessage(std::shared_ptr message) {
            messages_.push_back(message);
        };

        template
        std::shared_ptr Queue::DequeueMessage() {
            if (length() > 0) {
                auto item = messages_.front();
                messages_.pop_front();
                return item;
            }
            return nullptr;
        };

        template
        template
        void Queue::RegisterCallback(CallbackT &&callback) {
            std::scoped_lock lock(mutex_);
            callbacks_.template emplace_back(std::forward(callback));
        };

        template
        void Queue::RunCallbacks() {
            while (true) {
                std::chrono::time_point now = std::chrono::system_clock::now();
                auto milliseconds = std::chrono::duration_cast(prev - now);
                auto ms = 100 - milliseconds.count();
                while (ms > 0) {
                    --ms;
                }
                if (callbacks_.size() > 0 && messages_.size() > 0) {
                    std::scoped_lock lock(mutex_);
                    std::shared_ptr message = DequeueMessage();
                    for (auto &&callback: callbacks_) {
                        std::thread t = std::thread([&callback, &message]() {
                            callback(message);
                        });
                        t.detach();
                    }
                }
                prev = std::chrono::system_clock::now();
            }
        }
    } // namespace messaging
} // namespace tareeqav

Broker类充当发布/订阅和存储数据之间的中间层。

代码语言:javascript
复制
#pragma once

#include 
#include 
#include 
#include 

#include "tareeqav/messaging/queue.h"
#include "tareeqav/messaging/proto/control_message.h"

namespace tareeqav {
    namespace messaging {

        enum class TopicStatus {
            TOPIC_CREATED,
            TOPIC_EXISTS
        };

        enum class MessageStatus {
            QUEUED,
            PENDING,
            PUBLISHED,
            TOPIC_NOT_FOUND
        };

        class Broker {
        public:

            template
            TopicStatus CreateTopic(std::string topicName);

            template
            MessageStatus PublishMessage(std::string topicName, std::shared_ptr message);

            template
            void AddSubscription(
                    std::string topicName,
                    CallbackT &&callback);

        private:
            template
            std::shared_ptr> get(std::string topicName) {
                auto base = queues_[topicName];
                return std::dynamic_pointer_cast>(base);
            }

            std::vector threads_;
            std::unordered_map> queues_;
        };

        template
        TopicStatus Broker::CreateTopic(std::string topicName) {
            // check if we already have a topic with that name
            if (queues_.find(topicName) == queues_.end()) {
                queues_[topicName] = std::make_shared>();
                return TopicStatus::TOPIC_CREATED;
            }
            return TopicStatus::TOPIC_EXISTS;
        };

        template
        MessageStatus Broker::PublishMessage(std::string topicName, std::shared_ptr message) {
            if (queues_.find(topicName) != queues_.end()) {
                auto queue = get(topicName);
                std::scoped_lock lock(queue->mutex_);
                queue->EnqueueMessage(message);
                return MessageStatus::QUEUED;
            }
            return MessageStatus::TOPIC_NOT_FOUND;
        }

        template
        void Broker::AddSubscription(
                std::string topicName,
                CallbackT &&callback) {
            if (queues_.find(topicName) != queues_.end()) {
                auto queue = get(topicName);
                queue->RegisterCallback(std::forward(callback));

            }
        }
    } // namespace messaging
} // namespace tareeqav

Subscriber类是管理添加订阅的简单示例。

代码语言:javascript
复制
#pragma once

#include 
#include 
#include 
#include 

#include "tareeqav/messaging/broker.h"

namespace tareeqav {
    namespace nodes {

        using tareeqav::messaging::Broker;
        using tareeqav::messaging::TopicStatus;
        using tareeqav::messaging::MessageStatus;

        class Subscriber {
        public:
            Subscriber(
                    std::shared_ptr broker,
                    std::string topicName);

            template
            void AddSubscription(CallbackT &&callback);

        private:
            const std::string topicName_;
            std::shared_ptr broker_;
        };

        Subscriber::Subscriber(
                std::shared_ptr broker,
                std::string topicName) :
                topicName_(std::move(topicName)), broker_(std::move(broker)) {
        };

        template
        void Subscriber::AddSubscription(CallbackT &&callback) {
            broker_->template AddSubscription(
                    topicName_,
                    std::forward(callback));
        }
    } // namespace node
} // namespace tareeqav

Publisher类创建消息。

代码语言:javascript
复制
#pragma once

#include 
#include 
#include 
#include 

#include "tareeqav/messaging/broker.h"

namespace tareeqav {
    namespace nodes {
        using tareeqav::messaging::Broker;
        using tareeqav::messaging::TopicStatus;
        using tareeqav::messaging::MessageStatus;

        template
        class Publisher {
        public:
            Publisher(
                    std::shared_ptr broker,
                    std::string topicName);

            void CreateTopic() {
                TopicStatus status = broker_->template CreateTopic(topicName_);
                switch (status) {
                    case TopicStatus::TOPIC_CREATED:
                        std::cout << "topic created" << std::endl;
                        break;
                    case TopicStatus::TOPIC_EXISTS:
                        std::cout << "topic exists" << std::endl;
                        break;
                }
            }

            MessageStatus PublishMessage(
                    std::string topicName,
                    std::shared_ptr message);

        private:
            const std::string topicName_;
            std::shared_ptr broker_;
        };

        template
        Publisher::Publisher(
                std::shared_ptr broker,
                std::string topicName):
                topicName_(std::move(topicName)), broker_(std::move(broker)) {

        };

        template
        MessageStatus Publisher::PublishMessage(
                const std::string topicName,
                std::shared_ptr message) {
            std::cout << "publishing message " << message << std::endl;
            broker_->template PublishMessage(std::move(topicName), std::move(message));
            return MessageStatus::QUEUED;
        };
    } // namespace node
} // namespace tareeqav

下面继承的基管道类

代码语言:javascript
复制
#pragma once

#include 
#include 

#include "tareeqav/messaging/broker.h"

namespace tareeqav {
    using tareeqav::messaging::Broker;

    class Pipeline {
    public:
        Pipeline() = delete;

        Pipeline(std::shared_ptr broker) : broker_(std::move(broker)) {};

    protected:
        std::shared_ptr broker_ = nullptr;
    };

}

此管道是测试消息的使用者。管道既可以是消息的消费者,也可以是消息的生产者。

代码语言:javascript
复制
#pragma once

#include 
#include "tareeqav/pipelines/pipeline.h"
#include "tareeqav/messaging/proto/control_message.h"
#include "tareeqav/pipelines/perception/control_subscriber.h"

namespace tareeqav {
    namespace perception {
        using std::placeholders::_1;

        class PerceptionPipeline : public Pipeline {
        public:
            PerceptionPipeline(const std::shared_ptr broker) : Pipeline(std::move(broker)) {
                std::string topicName = std::string("/control_message");
                subscriber_ = std::make_shared<
                        nodes::Subscriber>(broker, topicName);

                controlCallback = std::make_shared();

                subscriber_->AddSubscription(
                        [this](auto &&T) { controlCallback->MessageCallback(std::forward(T)); }
                );

            };
        private:
            std::shared_ptr subscriber_ = nullptr;
            std::shared_ptr controlCallback = nullptr;
        };

    } // namespace perception

} // namespace tareeqav

此管道发布一条消息。

代码语言:javascript
复制
#pragma once

#include "tareeqav/pipelines/pipeline.h"
#include "tareeqav/node/publisher.h"
#include "tareeqav/messaging/proto/control_message.h"

namespace tareeqav {
    namespace planning {

        class PlanningPipeline : public Pipeline {
        public:
            PlanningPipeline(std::shared_ptr broker) : Pipeline(std::move(broker)) {
                std::string topicName = std::string("/control_message");
                controlPublisher_ = std::make_shared>(
                        broker_, topicName);

                controlPublisher_->CreateTopic();

                auto message = std::make_shared(.9, .1);

                controlPublisher_->PublishMessage(topicName, message);
            };
        private:
            std::shared_ptr> controlPublisher_ = nullptr;

        };
    } // namespace perception
} // namespace tareeqav

控制消息(它将很快被protobuf所取代)

代码语言:javascript
复制
#pragma once

namespace tareeqav
{
    namespace messaging
    {
class ControlMessage
{
public:
    ControlMessage(){};
    ControlMessage(float a, float s): acceleration(a), steering(s){};
    float acceleration;
    float steering;
};
    } // namespace perception
} // namespace tareeqav

这是带有入口点的司机。

代码语言:javascript
复制
#include 
#include "tareeqav/messaging/broker.h"
#include "tareeqav/pipelines/planning/planning.h"
#include "tareeqav/pipelines/perception/perception.h"

using tareeqav::messaging::Broker;
using tareeqav::planning::PlanningPipeline;
using tareeqav::perception::PerceptionPipeline;

int main(int argc, char *argv[]) {
    auto broker = std::make_shared();
    auto planningPipeline = std::make_shared(PlanningPipeline(broker));
    auto perceptionPipeline = std::make_shared(PerceptionPipeline(broker));

    while (true) {};

    return 0;
}
EN

回答 1

Code Review用户

回答已采纳

发布于 2022-08-02 19:54:18

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/278528

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档