考虑一个阻塞函数:this_thread::sleep_for(毫秒(3000));
我试图得到以下行为:
Trigger Blocking Function
|---------------------------------------------X我想触发阻塞函数,如果时间太长(超过2秒),就应该超时。
我做了以下工作:
my_connection = observable<>::create<int>([](subscriber<int> s) {
auto s2 = observable<>::just(1, observe_on_new_thread()) |
subscribe<int>([&](auto x) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
});
}) |
timeout(seconds(2), observe_on_new_thread());我不能让它起作用。首先,我认为S不能从另一个线程中on_next。
所以我的问题是,正确的反应方式是什么?如何在rxcpp中包装阻塞函数并向其添加超时?
随后,我想获得一个行为如下的RX流:
Trigger Cleanup
|------------------------X
(Delay) Trigger Cleanup
|-----------------X发布于 2017-07-12 18:40:42
问得好!上面的情况非常接近。
下面是如何将阻塞操作调整到rxcpp的示例。它使用libcurl轮询来发出http请求。
以下是你想做的事。
auto sharedThreads = observe_on_event_loop();
auto my_connection = observable<>::create<int>([](subscriber<int> s) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
s.on_completed();
}) |
subscribe_on(observe_on_new_thread()) |
//start_with(0) | // workaround bug in timeout
timeout(seconds(2), sharedThreads);
//skip(1); // workaround bug in timeout
my_connection.as_blocking().subscribe(
[](int){},
[](exception_ptr ep){cout << "timed out" << endl;}
);subscribe_on将在一个专用线程上运行create,因此允许create阻塞该线程。on_next/on_error/on_completed将在另一个线程上运行定时器,这个线程可以与其他线程共享,并将所有的timeout调用传输到同一个线程。as_blocking将确保subscribe在完成之前不会返回。这只用于防止main()退出--通常是在测试或示例程序中。编辑:为timeout中的bug添加了解决方法。目前,在第一个值到达之前,它不会安排第一个超时。
编辑2:timeout错误已经修复,解决方案不再需要了。
https://stackoverflow.com/questions/45051166
复制相似问题