我有一个异步方法,例如:
public async Task<T> GetAsync()
{
}并将从下列组织调用:
public async Task<IEnumerable<T>> GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
yield return result;
}
}上面的语法是无效的,但基本上我是指异步生成器。我知道它可以通过观察来处理。我用Rx.NET做了实验,在一定程度上起了作用。但是我试图避免它给代码库带来的复杂性,更重要的是,上面的需求本质上仍然不是一个反应性系统(我们的系统仍然是基于拉动式的)。例如,我只会在一段时间内聆听传入的异步流,并且我必须阻止生产者(而不仅仅是取消订阅消费者)从消费者端退出。
我可以像这样反转方法签名:
public IEnumerable<Task<T>> GetAllAsync()但是,这使得执行LINQ操作有点棘手而不阻塞。我希望它是非阻塞的,以及不加载整个东西到内存。这个库:AsyncEnumerable做的正是我想要的,但是如何用Ix.NET来做同样的事情呢?我相信它们是为了同一件事而设计的。
换句话说,在处理Ix.NET时,如何使用IAsyncEnumerable生成await?喜欢,
public async IAsyncEnumerable GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
return // what?
}
}发布于 2018-08-30 22:56:21
(编辑)
使用System.Linq.Async 4.0.0来自NuGet,现在您可以使用SelectAwait。
class Program
{
static void Main(string[] args)
{
Task.Run(async () =>
await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));
Thread.Sleep(4000);
}
static IAsyncEnumerable<string> GetAllAsync()
{
var something = new[] { 1, 2, 3 };
return something
.ToAsyncEnumerable()
.SelectAwait(async (x) => await GetAsync(x));
}
static async Task<string> GetAsync(int item)
{
await Task.Delay(1000); // heavy
return "got " + item;
}
}(过时)
使用来自System.Interactive.Async 3.2.0的NuGet,这个怎么样?目前,Select()不支持异步lambda,您必须自己实现它。
class Program
{
static void Main(string[] args)
{
Task.Run(async () =>
await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));
Thread.Sleep(4000);
}
static IAsyncEnumerable<string> GetAllAsync()
{
var something = new[] { 1, 2, 3 };
return something.SelectAsync(async (x) => await GetAsync(x));
}
static async Task<string> GetAsync(int item)
{
await Task.Delay(1000); // heavy
return "got " + item;
}
}
static class AsyncEnumerableExtensions
{
public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
{
return AsyncEnumerable.CreateEnumerable(() =>
{
var enumerator = enumerable.GetEnumerator();
var current = default(TResult);
return AsyncEnumerable.CreateEnumerator(async c =>
{
var moveNext = enumerator.MoveNext();
current = moveNext
? await selector(enumerator.Current).ConfigureAwait(false)
: default(TResult);
return moveNext;
},
() => current,
() => enumerator.Dispose());
});
}
}从这个例子中引用了扩展方法。https://github.com/maca88/AsyncGenerator/issues/94#issuecomment-385286972
https://stackoverflow.com/questions/49164605
复制相似问题