如何在反应式x中(最好是RxJava或RxJ中的示例)实现这一点?
a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)a是无限的事件流,它触发有限的事件流sn,每个事件流都应该是无限流S的一部分,同时能够订阅每个sn流(以便进行求和操作),但同时保持流S为无限。
编辑:更具体地说,我提供了我在Kotlin中寻找的实现。每10秒就会发出一个事件,该事件映射到共享的4个事件的有限流。转换流是flatMap-ed成正常的无限流。我利用doAfterNext来另外订阅每个有限的流并打印结果。
/** Creates a finite stream with events
* $ch-1 - $ch-4
*/
fun createFinite(ch: Char): Observable<String> =
Observable.interval(1, TimeUnit.SECONDS)
.take(4)
.map({ "$ch-$it" }).share()
fun main(args: Array<String>) {
var ch = 'A'
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.doAfterNext {
it
.count()
.subscribe({ c -> println("I am done. Total event count is $c") })
}
.flatMap { it }
.subscribe { println("Just received [$it] from the infinite stream ") }
// Let main thread wait forever
CountDownLatch(1).await()
}然而,我不确定这是否是“纯粹的RX”方式。
发布于 2017-09-05 00:41:22
你不清楚你想要如何计算。如果您正在进行总计数,则不需要进行内部订阅:
AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.flatMap { it }
.doOnNext( counter.incrementAndget() )
.subscribe { println("Just received [$it] from the infinite stream ") }另一方面,如果您需要为每个中间可观察对象提供计数,则可以将计数移动到flatMap()中,并打印出计数并在完成时重置计数:
AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.flatMap { it
.doOnNext( counter.incrementAndget()
.doOnCompleted( { long ctr = counter.getAndSet(0)
println("I am done. Total event count is $ctr")
} )
.subscribe { println("Just received [$it] from the infinite stream ") }这不是很实用,但这种报告往往会破坏正常的流。
https://stackoverflow.com/questions/46016305
复制相似问题