我的问题最终是关于ruby rx库的,尽管任何语言的任何示例都会很高兴。基本上,我想要的是将每个操作调度到一个现有的事件循环(或线程池)。我想这必须由一个调度器来完成。我还没有找到任何调度器将递归操作发送到事件循环的示例,这就是为什么我要问这个问题。下面是ruby rx的列表:
https://github.com/ReactiveX/RxRuby/tree/master/lib/rx/concurrency
为什么要使用事件循环?因为我想添加在事件循环中工作的IO操作,并利用并发性。如下所示:
Rx::Observable.from_enumerable(hosts).
map { |h| HTTP.connect(h) }.
map{|host| host.get("http://myservice/somelist.txt") }.
on_next { |html| parse(html).each_line.....} # you get the idea发布于 2016-04-06 23:21:44
这通常是通过调度器完成的,我希望RubyRx端口包含EventloopScheduler。
您可以使用ObserveOn运算符将它们排入队列/进行调度
Rx::Observable.from_enumerable(hosts).
observeOn(els). # you have declared els somewhere else as an EventLoopScheduler instance
map { |h| HTTP.connect(h) }.
map{|host| host.get("http://myservice/somelist.txt") }.
on_next { |html| parse(html).each_line.....} # you get the idea或者,您可以在映射中添加并发
Rx::Observable.from_enumerable(hosts).
observeOn(els). # you have declared els somewhere else as an EventLoopScheduler instance
map { |h| HTTP.connect(h) }.
flatmap{|host| Rx::Observable.start(host.get("http://myservice/somelist.txt"), els) }.
on_next { |html| parse(html).each_line.....} # you get the idea我希望代码可以工作(我是C#/JS)
https://stackoverflow.com/questions/36452607
复制相似问题