我找了一份副本却没有找到。我所拥有的是一个嵌套的可观察的IObservable<IObservable<T>>,我想将它压平成一个IObservable<T>。我不想使用Concat操作符,因为它将对每个内部可观测到的订阅延迟到以前可观测的完成。这是一个问题,因为内部可观测值是冷的,我希望它们在外部可观测到的值发出后立即开始发射T值。我也不想使用Merge操作符,因为它扰乱了发出的值的顺序。下面的大理石图显示了Merge操作符的问题行为(就我的情况而言),以及所希望的合并行为。
Stream of observables: +----1------2-----3----|
Observable-1 : +--A-----------------B-------|
Observable-2 : +---C---------------------D------|
Observable-3 : +--E--------------------F-------|
Merge (undesirable) : +-------A-------C----E----B-----------D---F-------|
Desirable merging : +-------A-----------------B-------C---D------EF---|由可观测-1发出的所有值都应先于可观测的-2发出的任何值。同样的情况也应该适用于可观测的-2和可观测的-3,等等。
我喜欢Merge操作符的地方是,它允许配置内部可观察到的最大并发订阅。我想用我正在尝试实现的自定义MergeOrdered操作符来保留这个功能。以下是我的在建方法:
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
{
return source.Merge(maximumConcurrency); // How to make it ordered?
}下面是一个用法示例:
var source = Observable
.Interval(TimeSpan.FromMilliseconds(300))
.Take(4)
.Select(x => Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(y => $"{x + 1}-{(char)(65 + y)}")
.Take(3));
var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");产出(不良):
Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C理想的产出是:
Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-CClarification:关于值的排序,值本身是无关的。重要的是它们的起源内部序列的顺序,以及它们在这个序列中的位置。第一个内部序列中的所有值都应该首先发出(按其原始顺序),然后是第二个内部序列中的所有值,然后是第三个内部序列中的所有值,等等。
发布于 2020-11-15 06:47:11
这是无法观察到的,无法知道任何内部观测值的最后值是否是应该产生的第一个值。
例如,您可以拥有以下内容:
Stream of observables: +--1---2---3--|
Observable-1 : +------------B--------A-|
Observable-2 : +--C--------D-|
Observable-3 : +-E--------F-|
Desirable merging : +------------------------ABCDEF|在这种情况下,我会这样做:
IObservable<char> query =
sources
.ToObservable()
.Merge()
.ToArray()
.SelectMany(xs => xs.OrderBy(x => x));https://stackoverflow.com/questions/64841312
复制相似问题