我正在调查TPL-Dataflow是否能够让我们从为高并发应用程序编写带有锁和监视器的样板代码中解脱出来。
因此,我正在模拟一个简单的场景,其中有一个生产者和多个消费者,每个消费者都希望获得所有产生的消息。而且,如果一些消费者比其他消费者慢,它不应该导致系统停滞。
代码如下:
using NLog;
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApp10
{
internal sealed class Program
{
private static readonly Logger m_logger = LogManager.GetCurrentClassLogger();
static void Main(string[] args)
{
BroadcastBlock<int> root = new BroadcastBlock<int>(d => d);
ExecutionDataflowBlockOptions consumerOptions = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 3
};
for (int consumerIndex = 0; consumerIndex < 5; ++consumerIndex)
{
int c = consumerIndex;
ActionBlock<int> consumer = new ActionBlock<int>(
(int d) => {
m_logger.Trace($"[#{c}] Starting consuming {d}");
Thread.Sleep(c * 100);
m_logger.Trace($"[#{c}] Ended consuming {d}");
},
consumerOptions
);
root.LinkTo(consumer);
}
Producer(10, root);
Console.ReadLine();
}
private static void Producer(int n, ITargetBlock<int> target)
{
for (int i = 0; i < n; ++i)
{
m_logger.Trace($"Starting producing {i}");
if (!target.Post(i))
{
throw new Exception($"Failed to post message #{i}");
}
m_logger.Trace($"Ending producing {i}");
Thread.Sleep(50);
}
}
}
}正如您可能看到的,我将消费者中的缓冲区大小限制为3(以防止速度慢的消费者的缓冲区无限增长)。
每个下一个消费者的速度都比前一个慢。消费者#0是最快的,没有延迟。生产者在生产过程中有一些小的延迟。
我期望消费者#0至少会使用所有的消息,消费者#4不会收到一些消息,因为它的缓冲区会溢出。
结果如下:
2021-04-15 22:44:15.4905 [T1] Starting producing 0
2021-04-15 22:44:15.5049 [T1] Ending producing 0
2021-04-15 22:44:15.5166 [T4] [#4] Starting consuming 0
2021-04-15 22:44:15.5285 [T7] [#0] Starting consuming 0
2021-04-15 22:44:15.5285 [T7] [#0] Ended consuming 0
2021-04-15 22:44:15.5285 [T7] [#1] Starting consuming 0
2021-04-15 22:44:15.5573 [T1] Starting producing 1
2021-04-15 22:44:15.5573 [T1] Ending producing 1
2021-04-15 22:44:15.5573 [T5] [#0] Starting consuming 1
2021-04-15 22:44:15.5573 [T5] [#0] Ended consuming 1
2021-04-15 22:44:15.5573 [T5] [#2] Starting consuming 0
2021-04-15 22:44:15.5573 [T6] [#3] Starting consuming 0
2021-04-15 22:44:15.6081 [T1] Starting producing 2
2021-04-15 22:44:15.6081 [T1] Ending producing 2
2021-04-15 22:44:15.6352 [T7] [#1] Ended consuming 0
2021-04-15 22:44:15.6352 [T7] [#1] Starting consuming 1
2021-04-15 22:44:15.6592 [T1] Starting producing 3
2021-04-15 22:44:15.6592 [T1] Ending producing 3
2021-04-15 22:44:15.7102 [T1] Starting producing 4
2021-04-15 22:44:15.7102 [T1] Ending producing 4
2021-04-15 22:44:15.7353 [T7] [#1] Ended consuming 1
2021-04-15 22:44:15.7353 [T7] [#1] Starting consuming 2
2021-04-15 22:44:15.7612 [T5] [#2] Ended consuming 0
2021-04-15 22:44:15.7612 [T5] [#2] Starting consuming 1
2021-04-15 22:44:15.7612 [T1] Starting producing 5
2021-04-15 22:44:15.7612 [T1] Ending producing 5
2021-04-15 22:44:15.8132 [T1] Starting producing 6
2021-04-15 22:44:15.8132 [T1] Ending producing 6
2021-04-15 22:44:15.8420 [T7] [#1] Ended consuming 2
2021-04-15 22:44:15.8420 [T7] [#1] Starting consuming 3
2021-04-15 22:44:15.8603 [T6] [#3] Ended consuming 0
2021-04-15 22:44:15.8603 [T6] [#3] Starting consuming 1
2021-04-15 22:44:15.8764 [T1] Starting producing 7
2021-04-15 22:44:15.8764 [T1] Ending producing 7
2021-04-15 22:44:15.9174 [T4] [#4] Ended consuming 0
2021-04-15 22:44:15.9174 [T4] [#4] Starting consuming 1
2021-04-15 22:44:15.9369 [T1] Starting producing 8
2021-04-15 22:44:15.9369 [T1] Ending producing 8
2021-04-15 22:44:15.9509 [T7] [#1] Ended consuming 3
2021-04-15 22:44:15.9509 [T7] [#1] Starting consuming 4
2021-04-15 22:44:15.9639 [T5] [#2] Ended consuming 1
2021-04-15 22:44:15.9639 [T5] [#2] Starting consuming 2
2021-04-15 22:44:15.9874 [T1] Starting producing 9
2021-04-15 22:44:15.9874 [T1] Ending producing 9
2021-04-15 22:44:16.0515 [T7] [#1] Ended consuming 4
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 2
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 2
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 3
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 3
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 4
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 4
2021-04-15 22:44:16.0515 [T7] [#1] Starting consuming 5
2021-04-15 22:44:16.1525 [T7] [#1] Ended consuming 5
2021-04-15 22:44:16.1525 [T7] [#1] Starting consuming 6
2021-04-15 22:44:16.1525 [T6] [#3] Ended consuming 1
2021-04-15 22:44:16.1525 [T6] [#3] Starting consuming 2
2021-04-15 22:44:16.1645 [T5] [#2] Ended consuming 2
2021-04-15 22:44:16.1645 [T5] [#2] Starting consuming 4
2021-04-15 22:44:16.2526 [T7] [#1] Ended consuming 6
2021-04-15 22:44:16.2526 [T7] [#1] Starting consuming 7
2021-04-15 22:44:16.3177 [T4] [#4] Ended consuming 1
2021-04-15 22:44:16.3177 [T4] [#4] Starting consuming 2
2021-04-15 22:44:16.3537 [T7] [#1] Ended consuming 7
2021-04-15 22:44:16.3537 [T7] [#1] Starting consuming 9
2021-04-15 22:44:16.3537 [T5] [#2] Ended consuming 4
2021-04-15 22:44:16.3537 [T5] [#2] Starting consuming 5
2021-04-15 22:44:16.4547 [T7] [#1] Ended consuming 9
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 5
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 5
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 6
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 6
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 7
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 7
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 9
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 9
2021-04-15 22:44:16.4607 [T6] [#3] Ended consuming 2
2021-04-15 22:44:16.4607 [T6] [#3] Starting consuming 4
2021-04-15 22:44:16.5648 [T5] [#2] Ended consuming 5
2021-04-15 22:44:16.5648 [T5] [#2] Starting consuming 9
2021-04-15 22:44:16.7179 [T4] [#4] Ended consuming 2
2021-04-15 22:44:16.7179 [T4] [#4] Starting consuming 4
2021-04-15 22:44:16.7610 [T6] [#3] Ended consuming 4
2021-04-15 22:44:16.7610 [T6] [#3] Starting consuming 9
2021-04-15 22:44:16.7610 [T5] [#2] Ended consuming 9
2021-04-15 22:44:17.0611 [T6] [#3] Ended consuming 9
2021-04-15 22:44:17.1182 [T4] [#4] Ended consuming 4
2021-04-15 22:44:17.1182 [T4] [#4] Starting consuming 9
2021-04-15 22:44:17.5185 [T4] [#4] Ended consuming 9 令我困惑的是,消费者#0永远不会收到消息8。实际上,没有其他消费者收到这条消息。为甚麽呢?这是数据流的预期行为吗?
如果你想检查我的目标,如下所示(我使用AsyncWrapper NLog.config来防止文件访问影响我的实验结果):
<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.nlog-project.org/schemas/NLog.xsd NLog.xsd"
internalLogFile="nlog.log"
internalLogLevel="Warn"
throwExceptions="false"
parseMessageTemplates="false"
>
<variable name="varExceptionMsg" value="${exception:format=Message}"/>
<variable name="varMessageWithException" value="${message}${onexception:inner= ${varExceptionMsg}}"/>
<variable name="msg4File" value="${longdate} [T${threadid}${threadname}] ${varMessageWithException} ${onexception:inner=${newline}${exception:format=tostring:maxInnerExceptionLevel=2:innerFormat=tostring}}" />
<targets>
<target name="file" xsi:type="AsyncWrapper" queueLimit="5000" overflowAction="Discard">
<target xsi:type="File"
layout="${msg4File}"
fileName="${basedir}/logs/${processname}.${shortdate}.log"
keepFileOpen="true"
encoding="utf-8"
/>
</target>
</targets>
<rules>
<logger name="*" minlevel="Trace" writeTo="file" />
</rules>
</nlog>发布于 2021-05-07 22:58:42
您需要在根上执行.Complete(),然后 Main ()需要等待所有消费者在主端之前完成他们的食物。
在Main的顶部:
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };在for循环中:
在您的ReadLine()之前添加以下内容:
root.Complete();
Task.WaitAll(comps.ToArray());https://stackoverflow.com/questions/67115184
复制相似问题