首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在帮助线程中轮询FDs,动态添加/删除描述符

在帮助线程中轮询FDs,动态添加/删除描述符
EN

Code Review用户
提问于 2021-09-04 06:55:04
回答 1查看 235关注 0票数 2

在编写时,我意识到在自己的线程中有多个类轮询文件描述符--很多重复的代码,比如同步、创建pollfd数组等等。

我决定通过编写下面的类来删除重复的代码,这个类负责在后台轮询fds。有什么想法吗?

代码语言:javascript
复制
template
class Poller
{
public:
    using storage_type = std::vector;
    using callbacks_type = std::map;

    explicit Poller(ReadFn&& readFn) : readFn(std::forward(readFn)) {}
    ~Poller()
    {
        if (worker.joinable())
        {
            executeInContext([](storage_type& storage, callbacks_type& callbacks){
                storage.erase(storage.begin()+1, storage.end());
                callbacks.clear();
            });

            worker.join();
            close(_fd[0]);
            close(_fd[1]);
        }
    }

    void addDescriptor(int fd, Callback cb)
    {
        if (!worker.joinable())
        {
            pipe(_fd);
            _storage.push_back({_fd[0], POLLPRI | POLLIN, 0});
            _storage.push_back({fd, POLLPRI | POLLIN, 0});
            _callbacks[fd] = cb;

            worker = std::thread([this]{threadFunc();});
        }
        else
        {
            executeInContext([fd, cb](storage_type& storage, callbacks_type& callbacks){
                auto it = std::find_if(storage.begin(), storage.end(),
                                       [fd](struct pollfd i){ return fd == i.fd; });
                if (it == storage.end())
                {
                    storage.push_back({fd, POLLPRI | POLLIN, 0});
                }
                callbacks[fd] = cb;
            });
        }
    }

    void removeDescriptor(int fd)
    {
        executeInContext([fd](storage_type& storage, callbacks_type& callbacks){
            if (auto it = std::find_if(storage.begin(), storage.end(), [fd](struct pollfd i){
                                            return i.fd == fd;
                                        }); it != storage.end())
            {
                callbacks.erase(it->fd);
                storage.erase(it);
            }
        });

        if (_storage.size() == 1 && worker.joinable())
        {
            worker.join();

            close(_fd[0]);
            close(_fd[1]);
            _storage.clear();
        }
    }

    template
    void executeInContext(Callable&& func)
    {
        ::write(_fd[1], "suse\0", 5);
        {
            std::unique_lock lock(mutex);
            std::invoke(std::forward(func),
                        std::ref(_storage), std::ref(_callbacks));
        }
        cv.notify_all();
    }

protected:
    void threadFunc()
    {
        std::unique_lock lock(mutex);
        while (true)
        {
            if (int rv = poll(_storage.data(), _storage.size(), -1); rv > 0)
            {
                /* Pipe sync request */
                if (_storage.begin()->revents)
                {
                    char buff[0x10];
                    ::read(_fd[0], buff, sizeof(buff));

                    /* Wait for main thread to execute its stuff */
                    cv.wait(lock);

                    if (_storage.size() == 1)
                    {
                        /* All fds have been removed except the pipe, exit */
                        break;
                    }
                }

                for (auto& pfd : _storage)
                {
                    if (pfd.revents)
                    {
                        std::invoke(readFn,
                                    pfd.fd, _callbacks.at(pfd.fd));
                    }
                }
            }
            else if (errno == EINTR)
            {
                continue;
            }
            else
            { /* Debug trace breakpoint */ }
        }
    }

private:
    std::thread worker{};
    std::vector _storage{};
    std::map _callbacks{};
    std::condition_variable cv{};
    std::mutex mutex{};
    int _fd[2]{};

    ReadFn readFn;
};

示例用法:

代码语言:javascript
复制
using callback_type = std::function;
auto readfn = [](int fd, callback_type cb){
    struct event_structure ev{};
    ::read(fd, &ev, sizeof(ev));

    std::invoke(cb, ev.property);
};
Poller poller(readfn);

int fd = /* open FD */;
poller.addDescriptor(fd, [](int){/* Handle property change */});
/* More descriptors with unique callbacks added/removed during program execution */
EN

回答 1

Code Review用户

回答已采纳

发布于 2021-09-04 14:41:06

启动构造函数

中的线程

按需启动和停止线程需要非常小心,不要触发任何竞赛条件。但是没有理由不让线程一直运行,如果它只是在等待同步fd上的命令时被阻塞,那么它就不会使用任何CPU时间。因此,我建议您在构造函数中启动线程,并在析构函数中停止它。

缺失错误处理

::read()::write()的调用可能失败;请确保正确处理故障。

_callbacks

使用std::vector

应该可以将回调存储在std::vector中,这与_storage的顺序相匹配。这样,您不需要进行相当昂贵的查找就可以得到回调;您只需在_storage_callbacks中同时在threadFunc()中迭代即可。

线程安全

您所拥有的代码可能适用于您的应用程序,但它非常脆弱。编写它的方式只有在实例化的线程中向Poller添加和删除描述符才是安全的。最重要的是,回调不能安全地删除自身。它将有可能使它更加健壮,而代价是增加更多的复杂性。

原因之一是向_storage中添加项或从threadFunc()中删除项将使任何迭代器失效,包括D20中的for-loop在幕后使用的迭代器。一种可能的解决方案是在从循环中添加或删除项时设置一个标志,在调用回调之后,如果设置了标志,则立即退出循环。

另一个问题是,从轮询线程内部调用executeInContext()将导致死锁。一种潜在的解决方案是让检测executeInContext()是否检测是否从与threadFunc()相同的线程调用它,如果没有,则尝试锁定互斥对象。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/267668

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档