首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在某个地方有Ix.NET (System.Interactive)的例子吗?

在某个地方有Ix.NET (System.Interactive)的例子吗?
EN

Stack Overflow用户
提问于 2018-03-08 02:52:42
回答 1查看 3K关注 0票数 7

我有一个异步方法,例如:

代码语言:javascript
复制
public async Task<T> GetAsync()
{

}

并将从下列组织调用:

代码语言:javascript
复制
public async Task<IEnumerable<T>> GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        yield return result;
    }
}

上面的语法是无效的,但基本上我是指异步生成器。我知道它可以通过观察来处理。我用Rx.NET做了实验,在一定程度上起了作用。但是我试图避免它给代码库带来的复杂性,更重要的是,上面的需求本质上仍然不是一个反应性系统(我们的系统仍然是基于拉动式的)。例如,我只会在一段时间内聆听传入的异步流,并且我必须阻止生产者(而不仅仅是取消订阅消费者)从消费者端退出。

我可以像这样反转方法签名:

代码语言:javascript
复制
public IEnumerable<Task<T>> GetAllAsync()

但是,这使得执行LINQ操作有点棘手而不阻塞。我希望它是非阻塞的,以及不加载整个东西到内存。这个库:AsyncEnumerable做的正是我想要的,但是如何用Ix.NET来做同样的事情呢?我相信它们是为了同一件事而设计的。

换句话说,在处理Ix.NET时,如何使用IAsyncEnumerable生成await?喜欢,

代码语言:javascript
复制
public async IAsyncEnumerable GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        return // what?
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-08-30 22:56:21

(编辑)

使用System.Linq.Async 4.0.0来自NuGet,现在您可以使用SelectAwait

代码语言:javascript
复制
class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something
            .ToAsyncEnumerable()
            .SelectAwait(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

(过时)

使用来自System.Interactive.Async 3.2.0的NuGet,这个怎么样?目前,Select()不支持异步lambda,您必须自己实现它。

更好地支持基于异步任务的AsyncEnumerable重载

代码语言:javascript
复制
class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something.SelectAsync(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

static class AsyncEnumerableExtensions
{
    public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
    {
        return AsyncEnumerable.CreateEnumerable(() =>
        {
            var enumerator = enumerable.GetEnumerator();
            var current = default(TResult);
            return AsyncEnumerable.CreateEnumerator(async c =>
                {
                    var moveNext = enumerator.MoveNext();
                    current = moveNext
                        ? await selector(enumerator.Current).ConfigureAwait(false)
                        : default(TResult);
                    return moveNext;
                },
                () => current,
                () => enumerator.Dispose());
        });
    }
}

从这个例子中引用了扩展方法。https://github.com/maca88/AsyncGenerator/issues/94#issuecomment-385286972

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49164605

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档