首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >gRPC和etcd客户端

gRPC和etcd客户端
EN

Stack Overflow用户
提问于 2018-07-18 05:59:09
回答 1查看 1.3K关注 0票数 1

这个问题涉及到etcd特定的内容,但我认为这个问题更多地与gRPC的一般工作有关。我试图为一些键创建etcd Watch,因为文档很少,所以我看了一下诺基亚实现 --它很容易使代码适应我的需求,我想出了第一个版本,它工作得很好,创建了WatchCreateRequest,并启动了键更新回调。到目前一切尚好。然后我试着增加一个以上的键来观看。在这种情况下,ClientAsyncReaderWriter没有读/写。现在来问问题。

如果我在班上有以下成员

代码语言:javascript
复制
Watch::Stub watchStub;
CompletionQueue completionQueue;
ClientContext context;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
WatchResponse reply;

而且我想支持添加到我的类中的多个Watches,我想我必须在每次观察中保存几个变量,而不是作为类成员。首先,我想,WatchResponse reply应该是每Watch一个。我对stream不太确定,我应该为每个Watch拿一个吗?我几乎可以肯定,context可以在所有Watches中重用,并且100%肯定stubcompletionQueue可以用于所有Watches。所以问题是我的猜测-工作对吧?线程安全是什么?没有找到任何文档来描述哪些对象可以安全地从多个线程中使用,以及我必须在哪里同步访问。任何指向文档(不是这个)的链接都将不胜感激!

在将成员拆分为单个Watch属性之前测试代码(我知道没有正确的关闭)

代码语言:javascript
复制
using namespace grpc;
class Watcher
{
public:
    using Callback = std::function<void(const std::string&, const std::string&)>;

    Watcher(std::shared_ptr<Channel> channel) : watchStub(channel)
    {
        stream = watchStub.AsyncWatch(&context, &completionQueue, (void*) "create");
        eventPoller = std::thread([this]() { WaitForEvent(); });
    }

    void AddWatch(const std::string& key, Callback callback)
    {
        AddWatch(key, callback, false);
    }

    void AddWatches(const std::string& key, Callback callback)
    {
        AddWatch(key, callback, true);
    }

private:
    void AddWatch(const std::string& key, Callback callback, bool isRecursive)
    {
        auto insertionResult = callbacks.emplace(key, callback);
        if (!insertionResult.second) {
            throw std::runtime_error("Event handle already exist.");
        }
        WatchRequest watch_req;
        WatchCreateRequest watch_create_req;
        watch_create_req.set_key(key);
        if (isRecursive) {
            watch_create_req.set_range_end(key + "\xFF");
        }

        watch_req.mutable_create_request()->CopyFrom(watch_create_req);
        stream->Write(watch_req, (void*) insertionResult.first->first.c_str());

        stream->Read(&reply, (void*) insertionResult.first->first.c_str());
    }

    void WaitForEvent()
    {
        void* got_tag;
        bool ok = false;

        while (completionQueue.Next(&got_tag, &ok)) {
            if (ok == false) {
                break;
            }
            if (got_tag == (void*) "writes done") {
                // Signal shutdown
            }
            else if (got_tag == (void*) "create") {
            }
            else if (got_tag == (void*) "write") {
            }
            else {

                auto tag = std::string(reinterpret_cast<char*>(got_tag));
                auto findIt = callbacks.find(tag);
                if (findIt == callbacks.end()) {
                    throw std::runtime_error("Key \"" + tag + "\"not found");
                }

                if (reply.events_size()) {
                    ParseResponse(findIt->second);
                }
                stream->Read(&reply, got_tag);
            }
        }
    }

    void ParseResponse(Callback& callback)
    {
        for (int i = 0; i < reply.events_size(); ++i) {
            auto event = reply.events(i);
            auto key = event.kv().key();
            callback(event.kv().key(), event.kv().value());
        }
    }

    Watch::Stub watchStub;
    CompletionQueue completionQueue;
    ClientContext context;
    std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
    WatchResponse reply;
    std::unordered_map<std::string, Callback> callbacks;
    std::thread eventPoller;
};
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-07-18 23:28:14

对不起,我不太确定合适的Watch设计在这里。我不太清楚您是否希望为每个gRPC创建一个Watch调用。

无论如何,每个gRPC调用都有自己的ClientContextClientAsyncReaderWriter。但stubCompletionQueue并不是随叫随到的事情。

据我所知,没有找到线程安全类的中心位置。您可能希望阅读API文档以获得正确的期望。

在编写异步服务器负载报告服务时,我自己添加同步的惟一位置是围绕CompletionQueue,这样如果cq被关闭,就不会对新标记进行排队。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51394558

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档