首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >产生随机结果的C# Rx可观测值

产生随机结果的C# Rx可观测值
EN

Stack Overflow用户
提问于 2020-07-28 00:20:32
回答 2查看 76关注 0票数 0

考虑下面的程序;

代码语言:javascript
复制
class Program
{
    static IObservable<int> GetNumbers()
    {
        var observable = Observable.Empty<int>();
        foreach (var i in Enumerable.Range(1, 10))
        {
            observable = observable.Concat(Observable.FromAsync(() => Task.Run(() =>
            {
                Console.WriteLine($"Producing {i}");
                Thread.Sleep(1000);
                return i;
            })));
        }

        return observable;
    }

    static async Task LogNumbers(IObservable<int> observable)
    {
        var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
        await observable;
        subscription.Dispose();
    }

    static void Main(string[] args)
    {
        LogNumbers(GetNumbers()).Wait();
        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

它会产生以下输出

代码语言:javascript
复制
Producing 1
Producing 1
Producing 2
Consuming 1
Producing 2
Producing 3
Consuming 2
Producing 3
Producing 4
Consuming 3
Producing 4
Producing 5
Consuming 4
Producing 5
Producing 6
Consuming 5
Producing 6
Producing 7
Consuming 6
Producing 7
Producing 8
Consuming 7
Producing 8
Producing 9
Consuming 8
Producing 9
Producing 10
Consuming 9
Producing 10
Finished

它写出两个“产生x”语句和一个“消费x”语句。它为什么要这样做?为什么它从来没有写出预期的最终“消费10”语句?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-07-28 01:04:56

您将获得两条生产线的副本,因为您订阅了两次。最有可能的是,您没有得到消费10,因为当第二个订阅结束时,第一个订阅将被取消。如果你有时会得到消耗的10,我不会感到惊讶,仅仅是因为当时任务的运行顺序不同。

代码语言:javascript
复制
static async Task LogNumbers(IObservable<int> observable)
{
    //This is the first subscription
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));

    //This is the second subscription
    await observable;

    subscription.Dispose();
}

按照编写GetNumbers函数的方式,每次订阅可观察对象都会触发自己的一组10个任务运行,从而触发自己的一组输出。第一个订阅还监视所产生的值,并输出一个消费行。第二个订阅不处理生成的值,因为您没有使用await observable的值,但会导致运行第二组任务。

您可以通过对LogNumbers的参数使用Publish().RefCount()来消除第二个订阅,或者改为使用一个TaskCompletionSource并将其从当前不在第一个订阅中使用的OnError和OnComplete函数中标记为已完成。它们看起来像这样:

代码语言:javascript
复制
static async Task LogNumbersWithRefCount(IObservable<int> observable)
{
    observable = observable.Publish().RefCount();
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
    await observable;
    subscription.Dispose();
}

static async Task LogNumbersTCS(IObservable<int> observable)
{
    var t = new TaskCompletionSource<object>()
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"),
                       ex => t.TrySetException(ex),
                       () => t.TrySetResult(null));
    return t.Task;
}
票数 4
EN

Stack Overflow用户

发布于 2020-07-29 20:53:34

Gideon为你解决了这个问题,但当我开始在评论中添加一些提示时,我认为发布一个完整的解决方案可能会很好。试试这个:

代码语言:javascript
复制
static IObservable<int> GetNumbers() =>
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(i => (int)i + 1)
        .Do(i => Console.WriteLine($"Producing {i}"))
        .Take(10);

static Task LogNumbers(IObservable<int> observable) =>
    observable
        .Do(i => Console.WriteLine($"Consuming {i}"))
        .ToArray()
        .ToTask();

static void Main(string[] args)
{
    LogNumbers(GetNumbers()).Wait();
    Console.WriteLine("Finished");
    Console.ReadLine();
}

或者更干净利落地:

代码语言:javascript
复制
static IObservable<int> GetNumbers() =>
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(i => (int)i + 1)
        .Do(i => Console.WriteLine($"Producing {i}"))
        .Take(10);

static IObservable<int> LogNumbers(IObservable<int> observable) =>
    observable
        .Do(i => Console.WriteLine($"Consuming {i}"));

static async Task Main(string[] args)
{
    await LogNumbers(GetNumbers());
    Console.WriteLine("Finished");
    Console.ReadLine();
}

您可以直接对可观察对象执行await

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63119350

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档