首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将多个可观测数据与顺序保持和最大并发性合并?

如何将多个可观测数据与顺序保持和最大并发性合并?
EN

Stack Overflow用户
提问于 2020-11-15 04:52:47
回答 1查看 652关注 0票数 2

我找了一份副本却没有找到。我所拥有的是一个嵌套的可观察的IObservable<IObservable<T>>,我想将它压平成一个IObservable<T>。我不想使用Concat操作符,因为它将对每个内部可观测到的订阅延迟到以前可观测的完成。这是一个问题,因为内部可观测值是冷的,我希望它们在外部可观测到的值发出后立即开始发射T值。我也不想使用Merge操作符,因为它扰乱了发出的值的顺序。下面的大理石图显示了Merge操作符的问题行为(就我的情况而言),以及所希望的合并行为。

代码语言:javascript
复制
Stream of observables: +----1------2-----3----|
Observable-1         :      +--A-----------------B-------|
Observable-2         :             +---C---------------------D------|
Observable-3         :                   +--E--------------------F-------|
Merge (undesirable)  : +-------A-------C----E----B-----------D---F-------|
Desirable merging    : +-------A-----------------B-------C---D------EF---|

由可观测-1发出的所有值都应先于可观测的-2发出的任何值。同样的情况也应该适用于可观测的-2和可观测的-3,等等。

我喜欢Merge操作符的地方是,它允许配置内部可观察到的最大并发订阅。我想用我正在尝试实现的自定义MergeOrdered操作符来保留这个功能。以下是我的在建方法:

代码语言:javascript
复制
public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return source.Merge(maximumConcurrency); // How to make it ordered?
}

下面是一个用法示例:

代码语言:javascript
复制
var source = Observable
    .Interval(TimeSpan.FromMilliseconds(300))
    .Take(4)
    .Select(x => Observable
        .Interval(TimeSpan.FromMilliseconds(200))
        .Select(y => $"{x + 1}-{(char)(65 + y)}")
        .Take(3));

var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");

产出(不良):

代码语言:javascript
复制
Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C

理想的产出是:

代码语言:javascript
复制
Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C

Clarification:关于值的排序,值本身是无关的。重要的是它们的起源内部序列的顺序,以及它们在这个序列中的位置。第一个内部序列中的所有值都应该首先发出(按其原始顺序),然后是第二个内部序列中的所有值,然后是第三个内部序列中的所有值,等等。

EN

回答 1

Stack Overflow用户

发布于 2020-11-15 06:47:11

这是无法观察到的,无法知道任何内部观测值的最后值是否是应该产生的第一个值。

例如,您可以拥有以下内容:

代码语言:javascript
复制
Stream of observables: +--1---2---3--|
Observable-1         :    +------------B--------A-|
Observable-2         :        +--C--------D-|
Observable-3         :            +-E--------F-|
Desirable merging    : +------------------------ABCDEF|

在这种情况下,我会这样做:

代码语言:javascript
复制
IObservable<char> query =
    sources
        .ToObservable()
        .Merge()
        .ToArray()
        .SelectMany(xs => xs.OrderBy(x => x));
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64841312

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档