首先,我对冗长的帖子表示歉意。我不确定我应该包括哪些代码。
在c++的实践中,我正在使用c++17为pub/sub编写一个玩具应用程序。为了保持简单,发布服务器和订阅服务器都在同一个OS进程上,因此只需要一个deque来存储消息。
我不使用单例,而是跨构造函数创建一个shared_ptr和std::move。
我尝试在我熟悉的任何地方使用c++17成语;例如,在std::scoped_lock vs std::lock_guard和lambda over std::bind中。如果代码中还有其他可以升级的地方,请注意。
我使用c++的主要困难之一是不清楚哪些技术是c++11,哪些是modern。
沿着这些思路,你知道谷歌C++风格指南为什么禁止使用c++20吗?尤其是由于c++23正在快速逼近。
第一个类充当Queue,具有回调机制。
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
namespace tareeqav {
namespace messaging {
class QueueBase {
public:
virtual ~QueueBase() {};
};
template
class Queue : public QueueBase {
public:
Queue() {
std::thread t = std::thread([this]() {
this->RunCallbacks();
});
t.detach();
};
void EnqueueMessage(std::shared_ptr message);
const int length() { return messages_.size(); };
template
void RegisterCallback(CallbackT &&callback);
void RunCallbacks();
private:
std::mutex mutex_;
std::string id_;
std::deque> messages_ = {};
std::vector)>> callbacks_ = {};
std::chrono::time_point prev = std::chrono::system_clock::now();
std::shared_ptr DequeueMessage();
};
template
void Queue::EnqueueMessage(std::shared_ptr message) {
messages_.push_back(message);
};
template
std::shared_ptr Queue::DequeueMessage() {
if (length() > 0) {
auto item = messages_.front();
messages_.pop_front();
return item;
}
return nullptr;
};
template
template
void Queue::RegisterCallback(CallbackT &&callback) {
std::scoped_lock lock(mutex_);
callbacks_.template emplace_back(std::forward(callback));
};
template
void Queue::RunCallbacks() {
while (true) {
std::chrono::time_point now = std::chrono::system_clock::now();
auto milliseconds = std::chrono::duration_cast(prev - now);
auto ms = 100 - milliseconds.count();
while (ms > 0) {
--ms;
}
if (callbacks_.size() > 0 && messages_.size() > 0) {
std::scoped_lock lock(mutex_);
std::shared_ptr message = DequeueMessage();
for (auto &&callback: callbacks_) {
std::thread t = std::thread([&callback, &message]() {
callback(message);
});
t.detach();
}
}
prev = std::chrono::system_clock::now();
}
}
} // namespace messaging
} // namespace tareeqavBroker类充当发布/订阅和存储数据之间的中间层。
#pragma once
#include
#include
#include
#include
#include "tareeqav/messaging/queue.h"
#include "tareeqav/messaging/proto/control_message.h"
namespace tareeqav {
namespace messaging {
enum class TopicStatus {
TOPIC_CREATED,
TOPIC_EXISTS
};
enum class MessageStatus {
QUEUED,
PENDING,
PUBLISHED,
TOPIC_NOT_FOUND
};
class Broker {
public:
template
TopicStatus CreateTopic(std::string topicName);
template
MessageStatus PublishMessage(std::string topicName, std::shared_ptr message);
template
void AddSubscription(
std::string topicName,
CallbackT &&callback);
private:
template
std::shared_ptr> get(std::string topicName) {
auto base = queues_[topicName];
return std::dynamic_pointer_cast>(base);
}
std::vector threads_;
std::unordered_map> queues_;
};
template
TopicStatus Broker::CreateTopic(std::string topicName) {
// check if we already have a topic with that name
if (queues_.find(topicName) == queues_.end()) {
queues_[topicName] = std::make_shared>();
return TopicStatus::TOPIC_CREATED;
}
return TopicStatus::TOPIC_EXISTS;
};
template
MessageStatus Broker::PublishMessage(std::string topicName, std::shared_ptr message) {
if (queues_.find(topicName) != queues_.end()) {
auto queue = get(topicName);
std::scoped_lock lock(queue->mutex_);
queue->EnqueueMessage(message);
return MessageStatus::QUEUED;
}
return MessageStatus::TOPIC_NOT_FOUND;
}
template
void Broker::AddSubscription(
std::string topicName,
CallbackT &&callback) {
if (queues_.find(topicName) != queues_.end()) {
auto queue = get(topicName);
queue->RegisterCallback(std::forward(callback));
}
}
} // namespace messaging
} // namespace tareeqavSubscriber类是管理添加订阅的简单示例。
#pragma once
#include
#include
#include
#include
#include "tareeqav/messaging/broker.h"
namespace tareeqav {
namespace nodes {
using tareeqav::messaging::Broker;
using tareeqav::messaging::TopicStatus;
using tareeqav::messaging::MessageStatus;
class Subscriber {
public:
Subscriber(
std::shared_ptr broker,
std::string topicName);
template
void AddSubscription(CallbackT &&callback);
private:
const std::string topicName_;
std::shared_ptr broker_;
};
Subscriber::Subscriber(
std::shared_ptr broker,
std::string topicName) :
topicName_(std::move(topicName)), broker_(std::move(broker)) {
};
template
void Subscriber::AddSubscription(CallbackT &&callback) {
broker_->template AddSubscription(
topicName_,
std::forward(callback));
}
} // namespace node
} // namespace tareeqavPublisher类创建消息。
#pragma once
#include
#include
#include
#include
#include "tareeqav/messaging/broker.h"
namespace tareeqav {
namespace nodes {
using tareeqav::messaging::Broker;
using tareeqav::messaging::TopicStatus;
using tareeqav::messaging::MessageStatus;
template
class Publisher {
public:
Publisher(
std::shared_ptr broker,
std::string topicName);
void CreateTopic() {
TopicStatus status = broker_->template CreateTopic(topicName_);
switch (status) {
case TopicStatus::TOPIC_CREATED:
std::cout << "topic created" << std::endl;
break;
case TopicStatus::TOPIC_EXISTS:
std::cout << "topic exists" << std::endl;
break;
}
}
MessageStatus PublishMessage(
std::string topicName,
std::shared_ptr message);
private:
const std::string topicName_;
std::shared_ptr broker_;
};
template
Publisher::Publisher(
std::shared_ptr broker,
std::string topicName):
topicName_(std::move(topicName)), broker_(std::move(broker)) {
};
template
MessageStatus Publisher::PublishMessage(
const std::string topicName,
std::shared_ptr message) {
std::cout << "publishing message " << message << std::endl;
broker_->template PublishMessage(std::move(topicName), std::move(message));
return MessageStatus::QUEUED;
};
} // namespace node
} // namespace tareeqav下面继承的基管道类
#pragma once
#include
#include
#include "tareeqav/messaging/broker.h"
namespace tareeqav {
using tareeqav::messaging::Broker;
class Pipeline {
public:
Pipeline() = delete;
Pipeline(std::shared_ptr broker) : broker_(std::move(broker)) {};
protected:
std::shared_ptr broker_ = nullptr;
};
}此管道是测试消息的使用者。管道既可以是消息的消费者,也可以是消息的生产者。
#pragma once
#include
#include "tareeqav/pipelines/pipeline.h"
#include "tareeqav/messaging/proto/control_message.h"
#include "tareeqav/pipelines/perception/control_subscriber.h"
namespace tareeqav {
namespace perception {
using std::placeholders::_1;
class PerceptionPipeline : public Pipeline {
public:
PerceptionPipeline(const std::shared_ptr broker) : Pipeline(std::move(broker)) {
std::string topicName = std::string("/control_message");
subscriber_ = std::make_shared<
nodes::Subscriber>(broker, topicName);
controlCallback = std::make_shared();
subscriber_->AddSubscription(
[this](auto &&T) { controlCallback->MessageCallback(std::forward(T)); }
);
};
private:
std::shared_ptr subscriber_ = nullptr;
std::shared_ptr controlCallback = nullptr;
};
} // namespace perception
} // namespace tareeqav此管道发布一条消息。
#pragma once
#include "tareeqav/pipelines/pipeline.h"
#include "tareeqav/node/publisher.h"
#include "tareeqav/messaging/proto/control_message.h"
namespace tareeqav {
namespace planning {
class PlanningPipeline : public Pipeline {
public:
PlanningPipeline(std::shared_ptr broker) : Pipeline(std::move(broker)) {
std::string topicName = std::string("/control_message");
controlPublisher_ = std::make_shared>(
broker_, topicName);
controlPublisher_->CreateTopic();
auto message = std::make_shared(.9, .1);
controlPublisher_->PublishMessage(topicName, message);
};
private:
std::shared_ptr> controlPublisher_ = nullptr;
};
} // namespace perception
} // namespace tareeqav控制消息(它将很快被protobuf所取代)
#pragma once
namespace tareeqav
{
namespace messaging
{
class ControlMessage
{
public:
ControlMessage(){};
ControlMessage(float a, float s): acceleration(a), steering(s){};
float acceleration;
float steering;
};
} // namespace perception
} // namespace tareeqav这是带有入口点的司机。
#include
#include "tareeqav/messaging/broker.h"
#include "tareeqav/pipelines/planning/planning.h"
#include "tareeqav/pipelines/perception/perception.h"
using tareeqav::messaging::Broker;
using tareeqav::planning::PlanningPipeline;
using tareeqav::perception::PerceptionPipeline;
int main(int argc, char *argv[]) {
auto broker = std::make_shared();
auto planningPipeline = std::make_shared(PlanningPipeline(broker));
auto perceptionPipeline = std::make_shared(PerceptionPipeline(broker));
while (true) {};
return 0;
}发布于 2022-08-02 19:54:18
https://codereview.stackexchange.com/questions/278528
复制相似问题