我试着用.subscribe()让可观察到的东西变得火热。
与大多数语言不同,Python的rx v3.0使用管道命令来链接操作-例如replay()、ref_count()或publish() -而不是通常的‘’。链条。以下是指向管道命令的链接:https://rxpy.readthedocs.io/en/latest/migration.html#pipe-based-operator-chaining
我非常确定这个问题与我给replay()的第一个参数有关:lambda x: x。
import rx.subject
from rx import operators as op
stream = rx.subject.BehaviorSubject(0)
replayable_observable = stream.pipe(op.replay(lambda x: x, buffer_size=100))
replayable_observable.subscribe()
stream.subscribe()
for x in [1, 2, 3, 4, 5]:
stream.on_next(x)
replayable_observable.subscribe(lambda value: print("Received {0}".format(value)))
for x in [6, 7, 8, 9, 10]:
stream.on_next(x)我希望收到0-10;或者1-10。但我收到的却是5-10。
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10发布于 2019-08-27 03:06:18
看起来,replay ( mapper函数)的第一个参数旨在允许您在多播之后将更多的操作符链接到源可观测对象上。例如,如果将其从lambda x: x更改为lambda x: x.pipe(op.map(lambda y: y * 2)),则得到的值将加倍。
replay的docs似乎已经过时了,因为他们给出的示例mapper函数仍然使用旧样式的方法链接,而不是管道方法。此外,在此项目的GitHub代码库中,所有实际使用非缺省值作为mapper参数的replay的tests似乎都被注释掉了,因此没有明确的示例来说明如何正确使用此参数。
从源代码来看,我可以说的是,当您使用replay而不指定mapper时,返回的是ConnectableObservable而不是Observable。在将值正确地推送到主题之前连接此ConnectableObservable可以正确地缓冲结果。
import rx.subject
from rx import operators as op
stream = rx.subject.BehaviorSubject(0)
replayable_observable = stream.pipe(op.replay(buffer_size=100))
replayable_observable.connect()
for x in [1, 2, 3, 4, 5]:
stream.on_next(x)
replayable_observable.subscribe(lambda value: print("Received {0}".format(value)))
for x in [6, 7, 8, 9, 10]:
stream.on_next(x)
# Received 0
# Received 1
# Received 2
# Received 3
# Received 4
# Received 5
# Received 6
# Received 7
# Received 8
# Received 9
# Received 10https://stackoverflow.com/questions/57602436
复制相似问题