首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将热source1与冷source2合并

将热source1与冷source2合并
EN

Stack Overflow用户
提问于 2022-01-01 04:26:56
回答 2查看 119关注 0票数 2

source1发射A、B、C、D等,但从未完成

source2发出1,2并完成

我想合并到A1,B2,C1,D2等

更新

我最初的注意力是ZipRepeat,这是来自Theodor的建议,但是这会创建一个锁,因为source2的生成非常昂贵。

Enigmativity的最后一个注释解决了这个问题

代码语言:javascript
复制
source1.Zip(source2.ToEnumerable().ToArray().Repeat())
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-01-01 05:42:23

由于您想无限期地重复source2,并且您说它很冷(也就是说,它每次产生相同的值集,而且通常是以相同的频率计算),而且代价很高,所以我们希望将IObservable<T>转换为一个T[],以确保它只计算一次。

代码语言:javascript
复制
var array = source2.ToEnumerable().ToArray();
var output = source1.Zip(array.Repeat(), (x, y) => (x, y));
票数 1
EN

Stack Overflow用户

发布于 2022-01-01 07:05:39

假设理想的大理石图是这样的:

代码语言:javascript
复制
Source1: +--------A-------B-------C--------D-------|
Source2: +----1--------------2--------|
Merged:  +--------A1---------B2-------C1---D2------|

下面是具有此行为的ZipWithRepeated操作符:

代码语言:javascript
复制
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
    this IObservable<TFirst> first, IObservable<TSecond> second)
{
    return second.Replay(replayed => first.ToAsyncEnumerable()
        .Zip(replayed.ToAsyncEnumerable().Repeat())
        .ToObservable());
}

用法示例:

代码语言:javascript
复制
var merged = source1.ZipWithRepeated(source2);

这个解决方案需要依赖于System.Linq.AsyncSystem.Interactive.Async包,因为在压缩之前,这两个序列都被转换为IAsyncEnumerable<T>

Alternative:与其依赖于Rx Replay操作符来缓冲source2序列,更有效的解决方案是在从可观察到的转换为异步枚举之后进行缓冲。AFAICS在官方的Rx/Ix库中不支持重放/回忆录IAsyncEnumerable<T>,但是创建一个带有嵌入式缓冲的自定义Repeat操作符并不困难。下面是ZipWithRepeated操作符的另一种实现,它是基于这种思想的:

代码语言:javascript
复制
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
    this IObservable<TFirst> first, IObservable<TSecond> second)
{
    return first.ToAsyncEnumerable()
        .Zip(second.ToAsyncEnumerable().RepeatBuffered())
        .ToObservable();
}

private async static IAsyncEnumerable<TSource> RepeatBuffered<TSource>(
    this IAsyncEnumerable<TSource> source,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var buffer = new List<TSource>();
    await foreach (var item in source
        .WithCancellation(cancellationToken).ConfigureAwait(false))
    {
        buffer.Add(item); yield return item;
    }
    while (true) foreach (var item in buffer) yield return item;
}

此实现不依赖于System.Interactive.Async包,而仅依赖于System.Linq.Async包。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70546962

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档