首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >System.Reactive中的并发订户执行

System.Reactive中的并发订户执行
EN

Stack Overflow用户
提问于 2021-09-06 07:33:45
回答 1查看 132关注 0票数 1

我正在编写一个批处理管道,每Y秒处理X个未完成的操作。感觉System.Reactive很适合这一点,但我无法让订阅者并行执行。我的代码如下所示:

代码语言:javascript
复制
var subject = new Subject<int>();

var concurrentCount = 0;

using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Subscribe(list => 
    {
        var c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
        Interlocked.Decrement(ref concurrentCount);
    });
    
Parallel.For(0, 1_000_000, i =>
{
    subject.OnNext(i);
 });
subject.OnCompleted();

是否有一种以并发方式从这个缓冲的Subject中读取的优雅方法?

EN

回答 1

Stack Overflow用户

发布于 2021-09-07 12:12:31

你这么说:

代码语言:javascript
复制
// This never gets printed, because Subscribe is only ever called on a single thread.

只是这不是真的。什么都不打印的原因是Subscribe中的代码以锁定的方式发生--一次只有一个线程在Subscribe中执行,因此您正在递增该值,然后几乎立即递减它。既然它从零开始,它就永远没有机会升过1

这仅仅是因为Rx的合同。一次订阅中只有一个线程。

我们可以解决这个问题。

试试下面的代码:

代码语言:javascript
复制
using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .SelectMany(list =>
        Observable
            .Start(() =>
            {
                var c = Interlocked.Increment(ref concurrentCount);
                Console.WriteLine("Starting {0} simultaneous batches", c);
            })
            .Finally(() =>
            {
                var c = Interlocked.Decrement(ref concurrentCount);
                Console.WriteLine("Ending {0} simultaneous batches", c);
            }))
    .Subscribe();

现在,当我运行它(使用比您设置的1_000_000迭代更少的)时,输出如下所示:

代码语言:javascript
复制
Starting 1 simultaneous batches
Starting 4 simultaneous batches
Ending 3 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Starting 3 simultaneous batches
Ending 1 simultaneous batches
Ending 2 simultaneous batches
Starting 4 simultaneous batches
Starting 5 simultaneous batches
Ending 3 simultaneous batches
Starting 2 simultaneous batches
Starting 2 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Ending 0 simultaneous batches
Ending 4 simultaneous batches
Ending 1 simultaneous batches
Starting 1 simultaneous batches
Starting 1 simultaneous batches
Ending 0 simultaneous batches
Ending 0 simultaneous batches
票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69070794

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档