我正在编写一个批处理管道,每Y秒处理X个未完成的操作。感觉System.Reactive很适合这一点,但我无法让订阅者并行执行。我的代码如下所示:
var subject = new Subject<int>();
var concurrentCount = 0;
using var reader = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Subscribe(list =>
{
var c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
Interlocked.Decrement(ref concurrentCount);
});
Parallel.For(0, 1_000_000, i =>
{
subject.OnNext(i);
});
subject.OnCompleted();是否有一种以并发方式从这个缓冲的Subject中读取的优雅方法?
发布于 2021-09-07 12:12:31
你这么说:
// This never gets printed, because Subscribe is only ever called on a single thread.只是这不是真的。什么都不打印的原因是Subscribe中的代码以锁定的方式发生--一次只有一个线程在Subscribe中执行,因此您正在递增该值,然后几乎立即递减它。既然它从零开始,它就永远没有机会升过1。
这仅仅是因为Rx的合同。一次订阅中只有一个线程。
我们可以解决这个问题。
试试下面的代码:
using var reader = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.SelectMany(list =>
Observable
.Start(() =>
{
var c = Interlocked.Increment(ref concurrentCount);
Console.WriteLine("Starting {0} simultaneous batches", c);
})
.Finally(() =>
{
var c = Interlocked.Decrement(ref concurrentCount);
Console.WriteLine("Ending {0} simultaneous batches", c);
}))
.Subscribe();现在,当我运行它(使用比您设置的1_000_000迭代更少的)时,输出如下所示:
Starting 1 simultaneous batches
Starting 4 simultaneous batches
Ending 3 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Starting 3 simultaneous batches
Ending 1 simultaneous batches
Ending 2 simultaneous batches
Starting 4 simultaneous batches
Starting 5 simultaneous batches
Ending 3 simultaneous batches
Starting 2 simultaneous batches
Starting 2 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Ending 0 simultaneous batches
Ending 4 simultaneous batches
Ending 1 simultaneous batches
Starting 1 simultaneous batches
Starting 1 simultaneous batches
Ending 0 simultaneous batches
Ending 0 simultaneous batcheshttps://stackoverflow.com/questions/69070794
复制相似问题