首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在ConcatMap中定义Rx.Net?

如何在ConcatMap中定义Rx.Net?
EN

Stack Overflow用户
提问于 2019-06-21 03:03:18
回答 3查看 938关注 0票数 0

到目前为止,Rx.Net还没有类似的concatMap,但是考虑到可用的功能,肯定有一种方法可以获得类似的行为。我现在有一个observable.SelectMany(x => ProcessItemAsync(item).ToObservable()),其中ProcessItemAsync是一个异步方法,我希望按顺序执行这个方法,而不是一次执行所有的项。

如果我正确理解Rx,observable.ConcatMap(x => ProcessItemAsync(item).ToObservable())应该这样做,但它目前并不存在于Rx.Net中,那么实现相同行为的另一种方法是什么呢?

我可能有多个可观测源,而且每个源都可以并行执行ProcessItemAsync,应该是在流顺序内,以保持输入/输出顺序,所以我不能将它锁定在ProcessItemAsync上。

更新:

考虑到类似的问题,反应性扩展SelectMany和Concat,我举了一个例子:https://dotnetfiddle.net/cG3T2v --使用SemaphoreSlim是我成功地工作的唯一方法。

(这里是示例中的代码)

代码语言:javascript
复制
using System;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

        static async Task Main(string[] args)
        {
            Console.WriteLine("Kinda in order ...");

            var kindaInOrder = Observable.Range(1, 4)
                .Select(x => LongProcessAsync(x).ToObservable())
                .Concat()
                .Do(x => Console.WriteLine($"{x} concatenated"));

            await kindaInOrder.RunAsync(CancellationToken.None);

            Console.WriteLine();
            Console.WriteLine("Completely in order ...");

            var completelyInOrder = Observable.Range(1, 4)
                .Select(x => OrderedLongProcessAsync(x).ToObservable())
                .Concat()
                .Do(x => Console.WriteLine($"{x} concatenated"));

            await completelyInOrder.RunAsync(CancellationToken.None);
        }

        static async Task<int> LongProcessAsync(int n)
        {
            Console.WriteLine($"Job {n} started");
            await Task.Delay(TimeSpan.FromSeconds(4 - n));
            Console.WriteLine($"Job {n} done");

            return n;
        }

        static async Task<int> OrderedLongProcessAsync(int n)
        {
            await semaphore.WaitAsync();
            try
            {
                return await LongProcessAsync(n);
            }
            finally
            {
                semaphore.Release();
            }
        }
    }
}
EN

回答 3

Stack Overflow用户

发布于 2022-07-06 11:53:24

您可以使用.Select().Merge()的组合,并将maxConcurrent参数设置为1,获得所描述的行为(订阅每个内部生成的行为,只有在前面的行为完成时才能观察到)。

代码语言:javascript
复制
static IObservable<TU> ConcatMap<T, TU>(this IObservable<T> source, Func<T, IObservable<TU>> selector) {
    return source.Select(selector).Merge(maxConcurrent: 1);
}

为了确保长时间运行的操作只在前面的操作完成后才启动,选择器必须返回“冷”可观测值:

代码语言:javascript
复制
source.ConcatMap(t => Observable.FromAsync(() => LongProcessAsync(t)))
票数 0
EN

Stack Overflow用户

发布于 2019-07-10 15:29:33

通过将ConcatMap和.Select()结合起来,可以得到.Concat()的行为。

粘贴到下面的LINQPad以试用它

代码语言:javascript
复制
public void Main()
{
    // Generate 100 items and 'process' async
    Observable
        .Range(0, 100)
        .Select(x => ProcessItemAsync(x))
        .Concat()
        .Dump();
}

private readonly Random _rnd = new Random();

public async Task<int> ProcessItemAsync(int item)
{
    await Task.Delay(_rnd.Next(0, 1000));
    return item;
}

代码启动100个“作业”,需要不同的时间才能完成,但是您将看到,所得到的可观察结果按照作业启动的顺序返回结果,而不是它们完成的顺序,如果使用.SelectMany(),就会出现这种情况。

(编辑:在最初的文章中,我做了一个扩展方法,它选择/映射了给定的异步功能。然后,我创建了一个类,在那里等待、索引和存储结果,以便按顺序发出。但是我在这里找到了一个简单得多的解决方案:对反应性扩展SelectMany和Concat的回答,它是Select组合。)

票数 -1
EN

Stack Overflow用户

发布于 2019-11-01 18:42:30

就我在这里的评论采取后续行动是建议的解决办法。

相反,

代码语言:javascript
复制
.Select(x => LongProcessAsync(x).ToObservable())

使用以下方法:

代码语言:javascript
复制
.Select(x => Observable.Create<int>(async o =>
{
    try
    {
        var result = await LongProcessAsync(x);
        o.OnNext(result);
        o.OnCompleted();
    }
    catch (Exception ex)
    {
        o.OnError(ex);
    }
}))
票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56696317

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档