首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将有限流的无限流转换为无限流-反应X

将有限流的无限流转换为无限流-反应X
EN

Stack Overflow用户
提问于 2017-09-03 01:18:53
回答 1查看 576关注 0票数 1

如何在反应式x中(最好是RxJava或RxJ中的示例)实现这一点?

代码语言:javascript
复制
a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2                       |-x-x-x-x-x-| (subscribe)
s2                                               |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)

a是无限的事件流,它触发有限的事件流sn,每个事件流都应该是无限流S的一部分,同时能够订阅每个sn流(以便进行求和操作),但同时保持流S为无限。

编辑:更具体地说,我提供了我在Kotlin中寻找的实现。每10秒就会发出一个事件,该事件映射到共享的4个事件的有限流。转换流是flatMap-ed成正常的无限流。我利用doAfterNext来另外订阅每个有限的流并打印结果。

代码语言:javascript
复制
/** Creates a finite stream with events
 * $ch-1 - $ch-4
 */
fun createFinite(ch: Char): Observable<String> =
        Observable.interval(1, TimeUnit.SECONDS)
                .take(4)
                .map({ "$ch-$it" }).share()

fun main(args: Array<String>) {

    var ch = 'A'

    Observable.interval(10, TimeUnit.SECONDS).startWith(0)
            .map { createFinite(ch++) }
            .doAfterNext {
                it
                        .count()
                        .subscribe({ c -> println("I am done. Total event count is $c") })
            }
            .flatMap { it }
            .subscribe { println("Just received [$it] from the infinite stream ") }

    // Let main thread wait forever
    CountDownLatch(1).await()
}

然而,我不确定这是否是“纯粹的RX”方式。

EN

回答 1

Stack Overflow用户

发布于 2017-09-05 00:41:22

你不清楚你想要如何计算。如果您正在进行总计数,则不需要进行内部订阅:

代码语言:javascript
复制
AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
        .map { createFinite(ch++) }
        .flatMap { it }
        .doOnNext( counter.incrementAndget() )
        .subscribe { println("Just received [$it] from the infinite stream ") }

另一方面,如果您需要为每个中间可观察对象提供计数,则可以将计数移动到flatMap()中,并打印出计数并在完成时重置计数:

代码语言:javascript
复制
AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
        .map { createFinite(ch++) }
        .flatMap { it
                     .doOnNext( counter.incrementAndget()
                     .doOnCompleted( { long ctr = counter.getAndSet(0)
                                        println("I am done. Total event count is $ctr")
                                     } )
        .subscribe { println("Just received [$it] from the infinite stream ") }

这不是很实用,但这种报告往往会破坏正常的流。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46016305

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档