有没有办法在运行时改变Flowable.interval周期?
LOGGER.info("Start generating bullshit for 7 seconds:");
Flowable.interval(3, TimeUnit.SECONDS)
.map(tick -> random.nextInt(100))
.subscribe(tick -> LOGGER.info("tick = " + tick));
TimeUnit.SECONDS.sleep(7);
LOGGER.info("Change interval to 2 seconds:");发布于 2020-06-07 03:11:16
我有一个变通方法,但最好的方法是创建一个新的操作符。
这个解决方案是如何工作的?
您有一个触发源,它将提供值,何时开始开始一个新的间隔。源为switchMapped,间隔为内流。内部流接受上行源的输入值,用于设置新的间隔时间。
switchMap
当源发出一段时间(长时间)时,将调用switchMapλ,并立即订阅返回的可流动对象。当新值到达switchMap时,内部订阅的可流动间隔将被取消订阅,并再次调用lambda。返回的Inverval-Flowable将重新订阅。
这意味着,每次从源发出时,都会创建一个新的Inveral。
它的行为是怎样的?
当inveral被订阅并即将发出一个新值,并且从源发出一个新值时,内部流(inverval)将被取消订阅。因此,不再发出该值。新的Interval-Flowable被订阅,并将向其配置发出一个值。
解决方案
lateinit var scheduler: TestScheduler
@Before
fun init() {
scheduler = TestScheduler()
}
@Test
fun `62232235`() {
val trigger = PublishSubject.create<Long>()
val switchMap = trigger.toFlowable(BackpressureStrategy.LATEST)
// make sure, that a value is emitted from upstream, in order to make sure, that at least one interval emits values, when the upstream-sources does not provide a seed value.
.startWith(3)
.switchMap {
Flowable.interval(it, TimeUnit.SECONDS, scheduler)
.map { tick: Long? ->
tick
}
}
val test = switchMap.test()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
test.assertValues(0, 1, 2)
// send new onNext value at absolute time 10
trigger.onNext(10)
// the inner stream is unsubscribed and a new stream with inverval(10) is subscribed to. Therefore the first vale will be emitted at 20 (current: 10 + 10 configured)
scheduler.advanceTimeTo(21, TimeUnit.SECONDS)
// if the switch did not happen, there would be 7 values
test.assertValues(0, 1, 2, 0)
}https://stackoverflow.com/questions/62232235
复制相似问题