首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >利用mergeMap分解和变换重建未完成的可观测阵列

利用mergeMap分解和变换重建未完成的可观测阵列
EN

Stack Overflow用户
提问于 2018-03-01 14:50:41
回答 1查看 292关注 0票数 1

我有一个主题与publishReplay和反计数,所以它实际上从来没有完成。折射计数的输出是一个可观察的对象数组。我希望将可观察到的对象数组分解为发出的元素,根据需要进行转换,然后将它们合并回数组,但是由于源未完成,数组边界在异步性质中“丢失”。

例如,我使用mergeMap和扫描来积累数据。但是,正如预期的那样,它会不断地累积,因此得到的数组是流的整个历史记录。扫描的初始值仅在流的开始处初始化。我不能使用toArray,因为可观察的永远不会终止。

我在异步硬件设计方面有很多经验。这个异步链的硬件类比是使用锁存器。我不知道在RxJS中概念上的等价性是什么。我假设从refcount获取发出的数组输出并将其应用到Observable.from(theArrayOutput)中,但我不知道如何将其插入流链中。

代码语言:javascript
复制
import {Component, OnInit} from '@angular/core';
import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';

type IObjectsOperation = (types: Object[]) => Object[];

@Component({
    selector: 'app-events-test',
    templateUrl: './events-test.component.html',
    styleUrls: ['./events-test.component.css']
})

export class EventsTestComponent implements OnInit {
    public objects: Observable<Object[]>;
    public scanned: Observable<any>;
    protected updates: Subject<any> = new Subject<any>();
    protected functionStream: Subject<any> = new Subject<any>();
    protected addStream: Subject<Object> = new Subject<Object>();
    private initialObjects: Object[] = [1];

    constructor() {
        this.objects = this.updates
            .scan((objects: Object[],
                   operation: IObjectsOperation) => {
                    return operation(objects);
                },
                this.initialObjects)
            .publishReplay(1)
            .refCount();
        this.functionStream
            .map(function (message: Object): IObjectsOperation {
                return (messages: Object[]) => {
                    return messages.concat(message);
                };
            })
            .subscribe(this.updates);
        this.addStream.subscribe(this.functionStream);
        this.scanned = this.objects
            .mergeMap(val => val)
            // some transformation that I want to have happen, in this case no-op
            .filter(() => {return true})
            // attempt to rebuild array, but items are accumulated
            .scan( (acc: Array<any>, x: Object) => { return acc.concat(x); }, [])

    }


transform(objects) {
    return Observable.from(objects)
        // this withLatestFrom suggestion didn't work
        // .withLatestFrom( this.functionStream, ( val, fn ) => fn( val ) )
        .filter(() => {
            return true
        })
        .toArray()
}


    start(): void {
        console.log('---------STARTING');
        this.objects.mergeMap(obj => this.transform(obj))
        .subscribe(
            obj => console.log('objects: ' + obj)
        );
        this.scanned.subscribe(
            {
                next: obj => {
                    console.log('scanned: ' + obj);
                },
                error: () => {
                },
                complete: () => console.log('COMPLETED')
            }
        );
        this.add(2);
        this.add(3);
    }
    add(object: Object): void {
        this.addStream.next(object);
    }
    ngOnInit() {
        this.start();
    }

}

输出低于预期值。

代码语言:javascript
复制
---------STARTING
events-test.component.ts:49 objects: 1,2
events-test.component.ts:52 scanned: 1
events-test.component.ts:52 scanned: 1,2
events-test.component.ts:49 objects: 1,2,3
events-test.component.ts:52 scanned: 1,2,1
events-test.component.ts:52 scanned: 1,2,1,2
events-test.component.ts:52 scanned: 1,2,1,2,3

我想看到的是:

代码语言:javascript
复制
---------STARTING
events-test.component.ts:49 objects: 1,2
events-test.component.ts:52 scanned: 1,2
events-test.component.ts:49 objects: 1,2,3
events-test.component.ts:52 scanned: 1,2,3

我有几个解决办法可以使用:

  1. 将数据作为数组保存在整个流中。
  2. 将未完成的可观察的源转换为完成

我假设这种架构的解决方案是:

  1. 有一些操作我可以插入到流中,从可观察的对象中获取数组,并返回一个可观察到的已完成的数组。
  2. 需要从可观察到的源中产生一些外部信号,它指示‘处理’,然后终止限制扫描。我觉得这很笨拙。

我假设RxJS中有一些与异步硬件锁存器相当的东西,所以我想保留当前的体系结构。此外,我认为RxJS真的很酷,并且希望在基于时间的流处理方面缩小我的知识差距。

编辑:@xtianjohns给出了一个很好的答案,说明了如何执行内部循环,但是订阅仍然没有完成。建议的withLatestFrom添加导致转换函数中断,在注释掉这一行时,数组被呈现,但外部循环没有完成。如果存在行,则不呈现数组,循环也不完成。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-03-01 18:23:49

您的问题来自于使用mergeMap,然后是scan,并在“外部”流上这样做,而不是转换对象列表,然后发出该列表。

另一种方法是:如果您想要的是数组流,那么让我们始终将其保留为数组流。

代码语言:javascript
复制
        /* Function that takes array and returns stream of transformed arrays */
        function transform( objects ) {
          return Observable.from( objects )
            /* We only need this because your transformation is wrapped in a stream. */
            .withLatestFrom( this.functionStream, ( val, fn ) => fn( val ) )
            // some transformation that I want to have happen, in this case no-op
            .filter(() => {return true});
            .toArray();
        }

        this.scanned = this.objects
          /* let's keep this as Observable<Array<YourObject>> */
          .mergeMap( transform );
          /* No need to "rebuild" array, this is now a stream of arrays */

请注意,在本例中,我如何在transform中“转换”数组的元素,而不是试图在流中“展开”数组,然后重新组合。

这符合你的用例吗?

编辑:我在阅读你的functionStream签名时犯了错误,已经更正了这一点。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49052292

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档