在编写时,我意识到在自己的线程中有多个类轮询文件描述符--很多重复的代码,比如同步、创建pollfd数组等等。
我决定通过编写下面的类来删除重复的代码,这个类负责在后台轮询fds。有什么想法吗?
template
class Poller
{
public:
using storage_type = std::vector;
using callbacks_type = std::map;
explicit Poller(ReadFn&& readFn) : readFn(std::forward(readFn)) {}
~Poller()
{
if (worker.joinable())
{
executeInContext([](storage_type& storage, callbacks_type& callbacks){
storage.erase(storage.begin()+1, storage.end());
callbacks.clear();
});
worker.join();
close(_fd[0]);
close(_fd[1]);
}
}
void addDescriptor(int fd, Callback cb)
{
if (!worker.joinable())
{
pipe(_fd);
_storage.push_back({_fd[0], POLLPRI | POLLIN, 0});
_storage.push_back({fd, POLLPRI | POLLIN, 0});
_callbacks[fd] = cb;
worker = std::thread([this]{threadFunc();});
}
else
{
executeInContext([fd, cb](storage_type& storage, callbacks_type& callbacks){
auto it = std::find_if(storage.begin(), storage.end(),
[fd](struct pollfd i){ return fd == i.fd; });
if (it == storage.end())
{
storage.push_back({fd, POLLPRI | POLLIN, 0});
}
callbacks[fd] = cb;
});
}
}
void removeDescriptor(int fd)
{
executeInContext([fd](storage_type& storage, callbacks_type& callbacks){
if (auto it = std::find_if(storage.begin(), storage.end(), [fd](struct pollfd i){
return i.fd == fd;
}); it != storage.end())
{
callbacks.erase(it->fd);
storage.erase(it);
}
});
if (_storage.size() == 1 && worker.joinable())
{
worker.join();
close(_fd[0]);
close(_fd[1]);
_storage.clear();
}
}
template
void executeInContext(Callable&& func)
{
::write(_fd[1], "suse\0", 5);
{
std::unique_lock lock(mutex);
std::invoke(std::forward(func),
std::ref(_storage), std::ref(_callbacks));
}
cv.notify_all();
}
protected:
void threadFunc()
{
std::unique_lock lock(mutex);
while (true)
{
if (int rv = poll(_storage.data(), _storage.size(), -1); rv > 0)
{
/* Pipe sync request */
if (_storage.begin()->revents)
{
char buff[0x10];
::read(_fd[0], buff, sizeof(buff));
/* Wait for main thread to execute its stuff */
cv.wait(lock);
if (_storage.size() == 1)
{
/* All fds have been removed except the pipe, exit */
break;
}
}
for (auto& pfd : _storage)
{
if (pfd.revents)
{
std::invoke(readFn,
pfd.fd, _callbacks.at(pfd.fd));
}
}
}
else if (errno == EINTR)
{
continue;
}
else
{ /* Debug trace breakpoint */ }
}
}
private:
std::thread worker{};
std::vector _storage{};
std::map _callbacks{};
std::condition_variable cv{};
std::mutex mutex{};
int _fd[2]{};
ReadFn readFn;
};示例用法:
using callback_type = std::function;
auto readfn = [](int fd, callback_type cb){
struct event_structure ev{};
::read(fd, &ev, sizeof(ev));
std::invoke(cb, ev.property);
};
Poller poller(readfn);
int fd = /* open FD */;
poller.addDescriptor(fd, [](int){/* Handle property change */});
/* More descriptors with unique callbacks added/removed during program execution */发布于 2021-09-04 14:41:06
中的线程
按需启动和停止线程需要非常小心,不要触发任何竞赛条件。但是没有理由不让线程一直运行,如果它只是在等待同步fd上的命令时被阻塞,那么它就不会使用任何CPU时间。因此,我建议您在构造函数中启动线程,并在析构函数中停止它。
对::read()和::write()的调用可能失败;请确保正确处理故障。
_callbacks使用std::vector
应该可以将回调存储在std::vector中,这与_storage的顺序相匹配。这样,您不需要进行相当昂贵的查找就可以得到回调;您只需在_storage和_callbacks中同时在threadFunc()中迭代即可。
您所拥有的代码可能适用于您的应用程序,但它非常脆弱。编写它的方式只有在实例化的线程中向Poller添加和删除描述符才是安全的。最重要的是,回调不能安全地删除自身。它将有可能使它更加健壮,而代价是增加更多的复杂性。
原因之一是向_storage中添加项或从threadFunc()中删除项将使任何迭代器失效,包括D20中的for-loop在幕后使用的迭代器。一种可能的解决方案是在从循环中添加或删除项时设置一个标志,在调用回调之后,如果设置了标志,则立即退出循环。
另一个问题是,从轮询线程内部调用executeInContext()将导致死锁。一种潜在的解决方案是让检测executeInContext()是否检测是否从与threadFunc()相同的线程调用它,如果没有,则尝试锁定互斥对象。
https://codereview.stackexchange.com/questions/267668
复制相似问题