source1发射A、B、C、D等,但从未完成
source2发出1,2并完成
我想合并到A1,B2,C1,D2等
更新
我最初的注意力是Zip和Repeat,这是来自Theodor的建议,但是这会创建一个锁,因为source2的生成非常昂贵。
Enigmativity的最后一个注释解决了这个问题
source1.Zip(source2.ToEnumerable().ToArray().Repeat())发布于 2022-01-01 05:42:23
由于您想无限期地重复source2,并且您说它很冷(也就是说,它每次产生相同的值集,而且通常是以相同的频率计算),而且代价很高,所以我们希望将IObservable<T>转换为一个T[],以确保它只计算一次。
var array = source2.ToEnumerable().ToArray();
var output = source1.Zip(array.Repeat(), (x, y) => (x, y));发布于 2022-01-01 07:05:39
假设理想的大理石图是这样的:
Source1: +--------A-------B-------C--------D-------|
Source2: +----1--------------2--------|
Merged: +--------A1---------B2-------C1---D2------|下面是具有此行为的ZipWithRepeated操作符:
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second)
{
return second.Replay(replayed => first.ToAsyncEnumerable()
.Zip(replayed.ToAsyncEnumerable().Repeat())
.ToObservable());
}用法示例:
var merged = source1.ZipWithRepeated(source2);这个解决方案需要依赖于System.Linq.Async和System.Interactive.Async包,因为在压缩之前,这两个序列都被转换为IAsyncEnumerable<T>。
Alternative:与其依赖于Rx Replay操作符来缓冲source2序列,更有效的解决方案是在从可观察到的转换为异步枚举之后进行缓冲。AFAICS在官方的Rx/Ix库中不支持重放/回忆录IAsyncEnumerable<T>,但是创建一个带有嵌入式缓冲的自定义Repeat操作符并不困难。下面是ZipWithRepeated操作符的另一种实现,它是基于这种思想的:
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second)
{
return first.ToAsyncEnumerable()
.Zip(second.ToAsyncEnumerable().RepeatBuffered())
.ToObservable();
}
private async static IAsyncEnumerable<TSource> RepeatBuffered<TSource>(
this IAsyncEnumerable<TSource> source,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var buffer = new List<TSource>();
await foreach (var item in source
.WithCancellation(cancellationToken).ConfigureAwait(false))
{
buffer.Add(item); yield return item;
}
while (true) foreach (var item in buffer) yield return item;
}此实现不依赖于System.Interactive.Async包,而仅依赖于System.Linq.Async包。
https://stackoverflow.com/questions/70546962
复制相似问题