首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用RxPY interval observable定期调用异步协程?

如何使用RxPY interval observable定期调用异步协程?
EN

Stack Overflow用户
提问于 2019-06-22 05:33:13
回答 1查看 2.7K关注 0票数 4

我需要创建一个可观察的流,它以固定的时间间隔发出异步协程的结果。

intervalRead是一个返回可观察值的函数,它以间隔rate和异步协同程序函数fun作为参数,需要在定义的间隔调用该函数。

我的第一个方法是使用interval工厂方法创建一个Observable,然后使用map调用协程,使用from_future将其包装在一个observable中,然后获取协程返回的值。

代码语言:javascript
复制
async def foo():
    await asyncio.sleep(1)
    return 42

def intervalRead(rate, fun) -> Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        map(lambda i: rx.from_future(loop.create_task(fun()))),
    )

async def main():
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next= lambda item: print(item)
    )

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

然而,我得到的输出不是协程的结果,而是from_future返回的以指定时间间隔发出的可观测值

输出:<rx.core.observable.observable.Observable object at 0x033B5650>

如何才能获得该可观察对象返回的实际值?我预计会有42

我的第二个方法是创建一个自定义的observable:

代码语言:javascript
复制
def intervalRead(rate, fun) -> rx.Observable:
    interval = rx.interval(rate)
    def subs(observer: Observer, scheduler = None):
        loop = asyncio.get_event_loop()
        def on_timer(i):
            task = loop.create_task(fun())
            from_future(task).subscribe(
                on_next= lambda i: observer.on_next(i),
                on_error= lambda e: observer.on_error(e),
                on_completed= lambda: print('coro completed')
            )
        interval.subscribe(on_next= on_timer, on_error= lambda e: print(e))        
    return rx.create(subs)

但是,在订阅时,from_future(task)永远不会发送值,为什么会发生这种情况?

但是,如果我像这样编写intervalRead

代码语言:javascript
复制
def intervalRead(rate, fun):
    loop = asyncio.get_event_loop()
    task = loop.create_task(fun())
    return from_future(task)

我得到了预期的结果:42。显然,这不能解决我的问题,但它让我感到困惑,为什么它在我的第二种方法中不起作用?

最后,我尝试了使用rx.concurrency CurrentThreadScheduler的第三种方法,并使用schedule_periodic方法定期安排一个操作。然而,我面临着与第二种方法相同的问题。

代码语言:javascript
复制
def funWithScheduler(rate, fun):
    loop = asyncio.get_event_loop()
    scheduler = CurrentThreadScheduler()
    subject = rx.subjects.Subject()
    def action(param):
        obs = rx.from_future(loop.create_task(fun())).subscribe(
            on_next= lambda item: subject.on_next(item),
            on_error= lambda e: print(f'error in action {e}'),
            on_completed= lambda: print('action completed')
        )     
        obs.dispose()   
    scheduler.schedule_periodic(rate,action)
    return subject

我会感谢任何洞察力,我错过了什么或任何其他建议,以完成我需要的。这是我使用asyncio和RxPY的第一个项目,我只在angular项目的上下文中使用过RxJS,所以欢迎任何帮助。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-24 05:28:51

您的第一个示例几乎可以工作。只需进行两个更改即可使其正常工作:

首先,from_future的结果是一个发出单个项(完成时的未来值)的可观察对象。因此,map的输出是一个高阶可观量(发出可观测值的可观测值)。可以在map之后使用merge_all运算符,或使用flat_map而不是map来展平这些子可观察对象。

然后,间隔操作符必须在AsyncIO循环上调度它的计时器,这不是默认的情况:默认的调度程序是TimeoutScheduler,它会产生一个新的线程。因此,在原始代码中,不能在AsyncIO事件循环上调度任务,因为create_task是从另一个线程调用的。在调用subscribe时使用scheduler参数将声明用于整个操作符链的默认调度器。

以下代码有效(每5秒打印一次42):

代码语言:javascript
复制
import asyncio
import rx
import rx.operators as ops
from rx.scheduler.eventloop import AsyncIOScheduler


async def foo():
    await asyncio.sleep(1)
    return 42


def intervalRead(rate, fun) -> rx.Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        ops.map(lambda i: rx.from_future(loop.create_task(fun()))),
        ops.merge_all()
    )


async def main(loop):
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next=lambda item: print(item),
        scheduler=AsyncIOScheduler(loop)
    )

loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()
票数 9
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56710707

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档