首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ReceiveAsync中断/中断消息传递

ReceiveAsync中断/中断消息传递
EN

Stack Overflow用户
提问于 2018-09-07 09:05:55
回答 1查看 116关注 0票数 1

这个问题在试图实现建议的this problem解决方案时引起了人们的注意。

问题摘要

执行ReceiveAsync()从TransformBlock到WriteOnceBlock的调用会导致TransformBlock从流中从本质上删除自身。它停止传播任何类型的信息,无论是数据还是完成信号。

系统设计

该系统旨在通过一系列步骤解析大型CSV文件。

流程中有问题的部分可以(不熟练地)可视化如下:

平行四边形是BufferBlock,菱形是BroadcastBlocks,三角形是WriteOnceBlocks,箭头是TransformBlocks。实线表示用LinkTo()创建的链接,虚线表示从ParsedHeaderAndRecordJoiner到ParsedHeaderContainer块的ReceiveAsync()调用。我知道这个流有点不太理想,但这不是问题的主要原因。

代码

应用根

下面是类的一部分,它使用PropagateCompletion创建必要的块并将它们链接到一起

代码语言:javascript
复制
using (var cancellationSource = new CancellationTokenSource())
{
    var cancellationToken = cancellationSource.Token;
    var temporaryEntityInstance = new Card(); // Just as an example

    var producerQueue = queueFactory.CreateQueue<string>(new DataflowBlockOptions{CancellationToken = cancellationToken});
    var recordDistributor = distributorFactory.CreateDistributor<string>(s => (string)s.Clone(), 
        new DataflowBlockOptions { CancellationToken = cancellationToken });
    var headerRowContainer = containerFactory.CreateContainer<string>(s => (string)s.Clone(), 
        new DataflowBlockOptions { CancellationToken = cancellationToken });
    var headerRowParser = new HeaderRowParserFactory().CreateHeaderRowParser(temporaryEntityInstance.GetType(), ';', 
        new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
    var parsedHeaderContainer = containerFactory.CreateContainer<HeaderParsingResult>(HeaderParsingResult.Clone, 
        new DataflowBlockOptions { CancellationToken = cancellationToken});
    var parsedHeaderAndRecordJoiner = new ParsedHeaderAndRecordJoinerFactory().CreateParsedHeaderAndRecordJoiner(parsedHeaderContainer, 
        new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
    var entityParser = new entityParserFactory().CreateEntityParser(temporaryEntityInstance.GetType(), ';',
        dataflowBlockOptions: new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
    var entityDistributor = distributorFactory.CreateDistributor<EntityParsingResult>(EntityParsingResult.Clone, 
        new DataflowBlockOptions{CancellationToken = cancellationToken});

    var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};

    // Producer subprocess
    producerQueue.LinkTo(recordDistributor, linkOptions);

    // Header subprocess
    recordDistributor.LinkTo(headerRowContainer, linkOptions);
    headerRowContainer.LinkTo(headerRowParser, linkOptions);
    headerRowParser.LinkTo(parsedHeaderContainer, linkOptions);
    parsedHeaderContainer.LinkTo(errorQueue, new DataflowLinkOptions{MaxMessages = 1, PropagateCompletion = true}, dataflowResult => !dataflowResult.WasSuccessful);

    // Parsing subprocess
    recordDistributor.LinkTo(parsedHeaderAndRecordJoiner, linkOptions);
    parsedHeaderAndRecordJoiner.LinkTo(entityParser, linkOptions, joiningResult => joiningResult.WasSuccessful);
    entityParser.LinkTo(entityDistributor, linkOptions);
    entityDistributor.LinkTo(errorQueue, linkOptions, dataflowResult => !dataflowResult.WasSuccessful);
}

HeaderRowParser

此块解析CSV文件中的标题行并进行一些验证。

代码语言:javascript
复制
public class HeaderRowParserFactory
{
    public TransformBlock<string, HeaderParsingResult> CreateHeaderRowParser(Type entityType,
        char delimiter,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
    {
        return new TransformBlock<string, HeaderParsingResult>(headerRow =>
        {
            // Set up some containers
            var result = new HeaderParsingResult(identifier: "N/A", wasSuccessful: true);
            var fieldIndexesByPropertyName = new Dictionary<string, int>();

            // Get all serializable properties on the chosen entity type
            var serializableProperties = entityType.GetProperties()
                .Where(prop => prop.IsDefined(typeof(CsvFieldNameAttribute), false))
                .ToList();

            // Add their CSV fieldnames to the result
            var entityFieldNames = serializableProperties.Select(prop => prop.GetCustomAttribute<CsvFieldNameAttribute>().FieldName);
            result.SetEntityFieldNames(entityFieldNames);

            // Create the dictionary of properties by field name
            var serializablePropertiesByFieldName = serializableProperties.ToDictionary(prop => prop.GetCustomAttribute<CsvFieldNameAttribute>().FieldName, prop => prop, StringComparer.OrdinalIgnoreCase);

            var fields = headerRow.Split(delimiter);

            for (var i = 0; i < fields.Length; i++)
            {
                // If any field in the CSV is unknown as a serializable property, we return a failed result
                if (!serializablePropertiesByFieldName.TryGetValue(fields[i], out var foundProperty))
                {
                    result.Invalidate($"The header row contains a field that does not match any of the serializable properties - {fields[i]}.",
                        DataflowErrorSeverity.Critical);
                    return result;
                }

                // Perform a bunch more validation

                fieldIndexesByPropertyName.Add(foundProperty.Name, i);
            }

            result.SetFieldIndexesByName(fieldIndexesByPropertyName);
            return result;
        }, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
    }
}

ParsedHeaderAndRecordJoiner

对于通过管道的每个后续记录,此块用于检索已解析的头数据并将其添加到记录中。

代码语言:javascript
复制
public class ParsedHeaderAndRecordJoinerFactory
{
    public TransformBlock<string, HeaderAndRecordJoiningResult> CreateParsedHeaderAndRecordJoiner(WriteOnceBlock<HeaderParsingResult> parsedHeaderContainer, 
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
    {
        return new TransformBlock<string, HeaderAndRecordJoiningResult>(async csvRecord =>
            {
                var headerParsingResult = await parsedHeaderContainer.ReceiveAsync();

                // If the header couldn't be parsed, a critical error is already on its way to the failure logger so we don't need to continue
                if (!headerParsingResult.WasSuccessful) return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: false, null, null);

                // The entity parser can't do anything with the header record, so we send a message with wasSuccessful false
                var isHeaderRecord = true;
                foreach (var entityFieldName in headerParsingResult.EntityFieldNames)
                {
                    isHeaderRecord &= csvRecord.Contains(entityFieldName);
                }
                if (isHeaderRecord) return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: false, null, null);

                return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: true, headerParsingResult, csvRecord);
            }, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
    }
}

问题细节

对于当前的实现,ParsedHeaderAndRecordJoiner从对ParsedHeaderContainer的ReceiveAsync()调用中正确地接收数据并按预期返回,但是没有消息到达EntityParser。

另外,当一个完整的信号被发送到流的前端( ProducerQueue)时,它会传播到RecordDistributor,然后停止在ParsedHeaderAndRecordJoiner (它从HeaderRowContainer开始继续,所以RecordDistributor正在传递它)。

如果我自己移除ReceiveAsync()调用并模拟数据,则该块的行为与预期相同。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-09-07 16:40:21

我认为这部分是关键

但是,没有消息到达EntityParser。

根据示例,EntityParser不接收ParsedHeaderAndRecordJoiner输出的消息的唯一方法是当WasSuccessful返回false时。链接中使用的谓词排除了失败的消息,但这些消息没有去处,因此它们会在ParsedHeaderAndRecordJoiner输出缓冲区中生成,并且还会阻止Completion传播。您需要链接一个空目标来转储失败的消息。

代码语言:javascript
复制
parsedHeaderAndRecordJoiner.LinkTo(DataflowBlock.NullTarget<HeaderParsingResult>());

此外,如果模拟数据总是带着WasSuccessful true返回,那么这可能会指向await ...ReceiveAsync()

不一定是冒烟的枪,但是个很好的起点。当管道卡住时,您能确认ParsedHeaderAndRecordJoiner输出缓冲区中所有消息的状态吗?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52219108

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档