首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何等待RxPy并行线程完成

如何等待RxPy并行线程完成
EN

Stack Overflow用户
提问于 2017-05-15 21:33:02
回答 3查看 3.4K关注 0票数 7

基于这个非常好的答案,我可以让多个任务在RxPy中并行工作,我的问题是如何等待它们全部完成?我知道使用线程可以执行.join(),但是Rx调度程序似乎没有任何这样的选项。.to_blocking()也无济于事,MainThread在所有通知被触发并调用完整的处理程序之前就完成了。下面是一个例子:

代码语言:javascript
复制
from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread

def printthread(val):
    print("{}, thread: {}".format(val, current_thread().name))

def intense_calculation(value):
    printthread("calc {}".format(value))
    time.sleep(random.randint(5, 20) * .1)
    return value

if __name__ == "__main__":
    Observable.range(1, 3) \
        .select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=Scheduler.timeout)) \
        .observe_on(Scheduler.event_loop) \
        .subscribe(
            on_next=lambda x: printthread("on_next: {}".format(x)),
            on_completed=lambda: printthread("on_completed"),
            on_error=lambda err: printthread("on_error: {}".format(err)))

    printthread("\nAll done")
    # time.sleep(2)

预期产出

代码语言:javascript
复制
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
All done, thread: MainThread

实际输出

代码语言:javascript
复制
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

All done, thread: MainThread

如果取消对睡眠调用的注释,则实际输出

代码语言:javascript
复制
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

All done, thread: MainThread
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-07-17 08:32:21

对于ThreadPoolScheduler,您可以:

  1. 调度器= ThreadPoolScheduler(pool_size)
  2. 平行呼叫。
  3. scheduler.executor.shutdown()

然后,一旦全部完成,您就可以得到所有的结果。

票数 3
EN

Stack Overflow用户

发布于 2017-07-19 10:59:10

在这里张贴完整的解决方案:

代码语言:javascript
复制
from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread
from rx.concurrency import ThreadPoolScheduler

def printthread(val):
    print("{}, thread: {}".format(val, current_thread().name))

def intense_calculation(value):
    printthread("calc {}".format(value))
    time.sleep(random.randint(5, 20) * .1)
    return value

if __name__ == "__main__":
    scheduler = ThreadPoolScheduler(4)

    Observable.range(1, 3) \
        .select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=scheduler)) \
        .observe_on(Scheduler.event_loop) \
        .subscribe(
            on_next=lambda x: printthread("on_next: {}".format(x)),
            on_completed=lambda: printthread("on_completed"),
            on_error=lambda err: printthread("on_error: {}".format(err)))

    printthread("\nAll done")
    scheduler.executor.shutdown()
    # time.sleep(2)
票数 7
EN

Stack Overflow用户

发布于 2021-08-18 23:40:14

使用run()等待RxPy并行线程完成。

BlockingObservables已从RxPY v3中移除。

代码语言:javascript
复制
from threading import current_thread
import rx, random, multiprocessing, time
from rx import operators as ops

def intense_calculation(value):
   delay = random.randint(5, 20) * 0.2
   time.sleep(delay)
   print("From adding_delay: {0} Value : {1} {2}".format(current_thread(), value, delay))
   return (value[0], value[1]+ " processed")

thread_pool_scheduler = rx.scheduler.NewThreadScheduler()

my_dict={'A':'url1', 'B':'url2', 'C':'url3'}

new_dict = rx.from_iterable(my_dict.items()).pipe(
    ops.flat_map(lambda a: rx.of(a).pipe(
        ops.map(lambda a: intense_calculation(a)),
        ops.subscribe_on(thread_pool_scheduler)
    )),
    ops.to_dict(lambda x: x[0], lambda x: x[1])
).run()

print("From main: {0}".format(current_thread()))
print(str(new_dict))
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43989153

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档