到目前为止,Rx.Net还没有类似的concatMap,但是考虑到可用的功能,肯定有一种方法可以获得类似的行为。我现在有一个observable.SelectMany(x => ProcessItemAsync(item).ToObservable()),其中ProcessItemAsync是一个异步方法,我希望按顺序执行这个方法,而不是一次执行所有的项。
如果我正确理解Rx,observable.ConcatMap(x => ProcessItemAsync(item).ToObservable())应该这样做,但它目前并不存在于Rx.Net中,那么实现相同行为的另一种方法是什么呢?
我可能有多个可观测源,而且每个源都可以并行执行ProcessItemAsync,应该是在流顺序内,以保持输入/输出顺序,所以我不能将它锁定在ProcessItemAsync上。
更新:
考虑到类似的问题,反应性扩展SelectMany和Concat,我举了一个例子:https://dotnetfiddle.net/cG3T2v --使用SemaphoreSlim是我成功地工作的唯一方法。
(这里是示例中的代码)
using System;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
static readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
static async Task Main(string[] args)
{
Console.WriteLine("Kinda in order ...");
var kindaInOrder = Observable.Range(1, 4)
.Select(x => LongProcessAsync(x).ToObservable())
.Concat()
.Do(x => Console.WriteLine($"{x} concatenated"));
await kindaInOrder.RunAsync(CancellationToken.None);
Console.WriteLine();
Console.WriteLine("Completely in order ...");
var completelyInOrder = Observable.Range(1, 4)
.Select(x => OrderedLongProcessAsync(x).ToObservable())
.Concat()
.Do(x => Console.WriteLine($"{x} concatenated"));
await completelyInOrder.RunAsync(CancellationToken.None);
}
static async Task<int> LongProcessAsync(int n)
{
Console.WriteLine($"Job {n} started");
await Task.Delay(TimeSpan.FromSeconds(4 - n));
Console.WriteLine($"Job {n} done");
return n;
}
static async Task<int> OrderedLongProcessAsync(int n)
{
await semaphore.WaitAsync();
try
{
return await LongProcessAsync(n);
}
finally
{
semaphore.Release();
}
}
}
}发布于 2022-07-06 11:53:24
您可以使用.Select()和.Merge()的组合,并将maxConcurrent参数设置为1,获得所描述的行为(订阅每个内部生成的行为,只有在前面的行为完成时才能观察到)。
static IObservable<TU> ConcatMap<T, TU>(this IObservable<T> source, Func<T, IObservable<TU>> selector) {
return source.Select(selector).Merge(maxConcurrent: 1);
}为了确保长时间运行的操作只在前面的操作完成后才启动,选择器必须返回“冷”可观测值:
source.ConcatMap(t => Observable.FromAsync(() => LongProcessAsync(t)))发布于 2019-07-10 15:29:33
通过将ConcatMap和.Select()结合起来,可以得到.Concat()的行为。
粘贴到下面的LINQPad以试用它
public void Main()
{
// Generate 100 items and 'process' async
Observable
.Range(0, 100)
.Select(x => ProcessItemAsync(x))
.Concat()
.Dump();
}
private readonly Random _rnd = new Random();
public async Task<int> ProcessItemAsync(int item)
{
await Task.Delay(_rnd.Next(0, 1000));
return item;
}代码启动100个“作业”,需要不同的时间才能完成,但是您将看到,所得到的可观察结果按照作业启动的顺序返回结果,而不是它们完成的顺序,如果使用.SelectMany(),就会出现这种情况。
(编辑:在最初的文章中,我做了一个扩展方法,它选择/映射了给定的异步功能。然后,我创建了一个类,在那里等待、索引和存储结果,以便按顺序发出。但是我在这里找到了一个简单得多的解决方案:对反应性扩展SelectMany和Concat的回答,它是Select组合。)
发布于 2019-11-01 18:42:30
就我在这里的评论采取后续行动是建议的解决办法。
相反,
.Select(x => LongProcessAsync(x).ToObservable())使用以下方法:
.Select(x => Observable.Create<int>(async o =>
{
try
{
var result = await LongProcessAsync(x);
o.OnNext(result);
o.OnCompleted();
}
catch (Exception ex)
{
o.OnError(ex);
}
}))https://stackoverflow.com/questions/56696317
复制相似问题