首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从Parallel.For SendAsync到BufferBlock到异步转换?

从Parallel.For SendAsync到BufferBlock到异步转换?
EN

Stack Overflow用户
提问于 2017-05-20 12:45:21
回答 2查看 759关注 0票数 2

我正在学习TPL Dataflow图书馆。到目前为止这正是我要找的。

我创建了一个执行以下功能的简单类(如下所示)

  • 在执行ImportPropertiesForBranch时,我转到第三方api并获得属性列表
  • 返回xml列表并将其反序列化为属性数据数组(id、api端点、最后更新)。400+属性(如在房屋中)是存在的。
  • 然后使用一个Parallel.For将属性数据SendAsync到我的propertyBufferBlock中。
  • propertyBufferBlock链接到一个propertyXmlBlock (它本身就是一个TransformBlock)。
  • 然后,propertyXmlBlock (异步)返回到API (使用属性数据中提供的api端点),并获取属性xml以进行反序列化。
  • 一旦xml返回并变得可用,我们就可以反序列化。
  • 稍后,我将添加更多的TransformBlock来将其持久化到数据存储中。

所以我的问题是;

  • 是否有任何潜在的瓶颈或代码领域可能是麻烦的?我知道我没有包括任何错误处理或取消(这是即将到来)。
  • await异步调用可以在TransformBlock中进行吗?还是这是一个瓶颈?
  • 虽然代码工作正常,但我担心Parallel.ForBufferBlock和异步在TransformBlock中的缓冲和异步性。我不确定这是最好的方法,我可能混淆了一些概念。

欢迎任何指导、改进和陷阱建议。

代码语言:javascript
复制
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;

namespace My.ImportService
{
    public class ImportService
    {

        private readonly IApiService _apiService;
        private readonly IXmlService _xmlService;
        private readonly IRepositoryService _repositoryService;

        public ImportService(IApiService apiService,
            IXmlService xmlService,
            IRepositoryService repositoryService)
        {
            _apiService = apiService;
            _xmlService = xmlService;
            _repositoryService = repositoryService;

            ConstructPipeline();
        }

        private BufferBlock<propertiesProperty> propertyBufferBlock;
        private TransformBlock<propertiesProperty, string> propertyXmlBlock;
        private TransformBlock<string, propertyType> propertyDeserializeBlock;
        private ActionBlock<propertyType> propertyCompleteBlock;

        public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
        {
            var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);

            if (string.IsNullOrEmpty(propertyListXml))
                return false;

            var properties = _xmlService.DeserializePropertyList(propertyListXml);

            if (properties?.property == null || properties.property.Length == 0)
                return false;

            // limited to the first 20 for testing
            Parallel.For(0, 20,
                new ParallelOptions {MaxDegreeOfParallelism = 3},
                i => propertyBufferBlock.SendAsync(properties.property[i]));

            propertyBufferBlock.Complete();

            await propertyCompleteBlock.Completion;

            return true;
        }

        private void ConstructPipeline()
        {
            propertyBufferBlock = GetPropertyBuffer();
            propertyXmlBlock = GetPropertyXmlBlock();
            propertyDeserializeBlock = GetPropertyDeserializeBlock();
            propertyCompleteBlock = GetPropertyCompleteBlock();

            propertyBufferBlock.LinkTo(
                propertyXmlBlock,
                new DataflowLinkOptions {PropagateCompletion = true});

            propertyXmlBlock.LinkTo(
                propertyDeserializeBlock,
                new DataflowLinkOptions {PropagateCompletion = true});

            propertyDeserializeBlock.LinkTo(
                propertyCompleteBlock,
                new DataflowLinkOptions {PropagateCompletion = true});
        }

        private BufferBlock<propertiesProperty> GetPropertyBuffer()
        {
            return new BufferBlock<propertiesProperty>();
        }

        private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
        {
            return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
                {
                    Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
                    var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
                    return propertyXml;
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }

        private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
        {
            return new TransformBlock<string, propertyType>(xmlAsString =>
                {
                    Debug.WriteLine($"deserializing");
                    var propertyType = _xmlService.DeserializeProperty(xmlAsString);
                    return propertyType;
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }

        private ActionBlock<propertyType> GetPropertyCompleteBlock()
        {
            return new ActionBlock<propertyType>(propertyType =>
                {
                    Debug.WriteLine($"complete {propertyType.id}");
                    Debug.WriteLine(propertyType.address.display);
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }
    }
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-05-20 13:58:22

是否有任何潜在的瓶颈或代码领域可能是麻烦的?

一般来说,您的方法看起来不错,潜在的瓶颈在于您限制了使用MaxDegreeOfParallelism = 1对块进行并行处理。根据对问题的描述,可以独立地处理每个项,这就是为什么您可以一次处理多个项的原因。

TransformBlock中等待异步调用可以吗?还是这是一个瓶颈?

这是非常好的,因为TPL DataFlow支持异步操作。

虽然代码工作正常,但我担心Parallel.ForBufferBlock和异步在TransformBlock中的缓冲和异步性。我不确定这是最好的方法,我可能混淆了一些概念。

首先,代码中潜在的问题可能会让您自暴自弃,那就是在Parallel.For中调用异步方法,然后调用propertyBufferBlock.Complete();。这里的问题是,Parallel.For不支持异步操作,调用异步操作的方式是在返回任务完成之前调用propertyBufferBlock.SendAsync并继续前进。这意味着当Parallel.For退出时,某些操作可能仍处于运行状态,并且项尚未添加到缓冲区块中。如果然后调用propertyBufferBlock.Complete();,这些挂起的项将抛出异常,并且不会将项添加到处理中。你会得到未被观察到的异常。

可以使用ForEachAsync form 这篇博客文章确保在完成块之前将所有项添加到块中。但是,如果您仍然将处理限制为1操作,您可以一次只添加一个项。我不知道propertyBufferBlock.SendAsync是如何实现的,但是可以在内部限制每次添加一个项,所以并行添加没有任何意义。

票数 2
EN

Stack Overflow用户

发布于 2017-05-20 20:56:11

你实际上做了一些错误的事情:

代码语言:javascript
复制
i => propertyBufferBlock.SendAsync(properties.property[i])

您需要await方法,否则您将创建太多的同时任务。

也是这一行:

代码语言:javascript
复制
MaxDegreeOfParallelism = 1

将块的执行限制在相应的执行范围内,这会降低性能。

正如您在注释中所说的,您切换到了同步方法Post,并通过设置BoundedCapacity限制了块的容量。使用此变体时应谨慎,因为您需要检查它的返回值,该值声明消息是否已被接受。

至于您对等待块中的async方法的担忧--这是绝对可以的,并且应该像在使用async方法的其他情况中那样做。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44086034

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档