首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxJ将流拆分为多个流

RxJ将流拆分为多个流
EN

Stack Overflow用户
提问于 2017-10-19 18:40:51
回答 4查看 8.5K关注 0票数 9

如何根据分组方法将永续流拆分为多个结束流?

代码语言:javascript
复制
--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>

进入这些观测点

代码语言:javascript
复制
--a--a-a-a-a-|
             b---b-b--b-|
                        c-c---c-c-|
                                  d-d-d-|
                                        e...>

正如您所看到的,a在开始,在我收到b之后,我将不再获得a,因此应该结束它。这就是为什么正常的groupBy不好的原因。

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2017-10-19 20:08:00

您可以使用可观察到的源windowsharebufferCount(2, 1)还有一个小窍门

代码语言:javascript
复制
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

source
    .bufferCount(2, 1) // delay emission by one item
    .map(arr => arr[0])
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

此打印(由于toArray()):

代码语言:javascript
复制
[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]

此解决方案的问题是订阅source的顺序。我们需要window通知程序在第一个bufferCount之前进行订阅。否则,首先进一步推送一个项,然后检查它是否与使用.filter(([oldValue, newValue]) ...)的前一个项不同。

这意味着需要在window之前延迟发射一个(这是第一个.bufferCount(2, 1).map(arr => arr[0]) )。

或者我自己使用publish()更容易控制订阅顺序

代码语言:javascript
复制
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

const connectable = source.publish();

connectable
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

connectable.connect();

输出是相同的。

票数 6
EN

Stack Overflow用户

发布于 2017-10-19 19:53:23

也许有人可以想出一些更简单的东西,但这是可行的(小提琴:https://fiddle.jshell.net/uk01njgc/)。

代码语言:javascript
复制
let counter = 0;

let items = Rx.Observable.interval(1000)
.map(value => Math.floor(value / 3))
.publish();

let distinct = items.distinctUntilChanged()
.publish();

distinct
.map(value => {
  return items
  .startWith(value)
  .takeUntil(distinct);
})
.subscribe(obs => {
  let obsIndex = counter++;
  console.log('New observable');
  obs.subscribe(
    value => {
      console.log(obsIndex.toString() + ': ' + value.toString());
    },
    err => console.log(err),
    () => console.log('Completed observable')
  );
});

distinct.connect();
items.connect();
票数 2
EN

Stack Overflow用户

发布于 2017-10-19 20:17:37

这里有一个变体,把你所有的征兵分享.

代码语言:javascript
复制
const stream = ...;

// an Observable<Observable<T>>
// each inner observable completes when the value changes
const split = Observable
  .create(o => {
    const connected = stream.publish();

    // signals each time the values change (ignore the initial value)
    const newWindowSignal = connected.distinctUntilChanged().skip(1);

    // send the observables to our observer
    connected.window(newWindowSignal).subscribe(o);

    // now "start"
    return connected.connect();
  });
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46836738

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档