首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >HttpClient TimeOut与Polly舱壁政策问题

HttpClient TimeOut与Polly舱壁政策问题
EN

Stack Overflow用户
提问于 2021-07-06 08:16:54
回答 1查看 732关注 0票数 2

我有很多超时异常使用Polly舱壁策略,这个策略帮助我限制我发送到特定主机的并发调用的数量。但是,HttpClient超时时间似乎会影响整个委托。

我使用IHttpClientFactory来配置以下代码:

代码语言:javascript
复制
services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));


private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
    return Policy.BulkheadAsync(maxConcurrentRequests, int.MaxValue)
        .AsAsyncPolicy<HttpResponseMessage>();
}

我的问题是,我希望超时只影响请求本身,而不影响舱壁策略,因为我想实现的行为如下:

exception)

  • Send

  • 将并发请求的数量限制在特定主机上,

  • 无限地等待直到有能力发送请求(当queque将满时,Polly将向主机引发请求并应用超时,例如,默认超时。

)

我已经使用Semaphore而不是舱壁Polly策略来实现这种行为,但是我想使用策略封装该代码。

谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-07-12 15:49:58

我将这些示例放在一起,以演示如何对HttpClient请求执行节流操作的不同选项。我要强调的是,这些只是例子,与生产代码相去甚远,所以请通过那个玻璃仔细检查它们。

下面的示例代码展示了如何在火灾中发出请求,并忘记了(这样他们就不关心响应了)。这些解决方案假定请求比可用吞吐量多。换句话说,生产者比消费者更快,这就是为什么有某种排队机制来处理这种不平衡。

带Back和Action块

代码语言:javascript
复制
public class ThrottlingWithBatchBlock
{
    static readonly HttpClient client = new();
    private readonly BatchBlock<HttpRequestMessage> requests = new(100);
    private ActionBlock<HttpRequestMessage[]> consumer;

    public ThrottlingWithBatchBlock()
    {
        consumer = new(
            reqs => ConsumerAsync(reqs),
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
        requests.LinkTo(consumer);
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.SendAsync(request);
    }

    private async Task ConsumerAsync(HttpRequestMessage[] requests)
    {
        foreach (var request in requests)
            await client.SendAsync(request).ConfigureAwait(false);
    }
}

带缓冲块

代码语言:javascript
复制
public class ThrottlingWithBufferBlock
{
    static readonly HttpClient client = new();
    private readonly BufferBlock<HttpRequestMessage> requests = new(
            new DataflowBlockOptions { BoundedCapacity = 100 });

    public ThrottlingWithBufferBlock()
    {
        _ = ConsumerAsync();
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.SendAsync(request);
    }

    async Task ConsumerAsync()
    {
        while (await requests.OutputAvailableAsync())
        {
            var request = await requests.ReceiveAsync();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

有通道

代码语言:javascript
复制
public class ThrottlingWithChannels
{
    static readonly HttpClient client = new();
    private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
            new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });

    public ThrottlingWithChannels()
    {
        _ = ConsumerAsync();
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.Writer.WaitToWriteAsync();
        await requests.Writer.WriteAsync(request);
    }

    async Task ConsumerAsync()
    {
        while (await requests.Reader.WaitToReadAsync())
        {
            var request = await requests.Reader.ReadAsync();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

带阻塞集

代码语言:javascript
复制
public class ThrottlingWithBlockingCollection
{
    static readonly HttpClient client = new();
    private BlockingCollection<HttpRequestMessage> requests = new();

    public ThrottlingWithBlockingCollection()
    {
        _ = Enumerable.Range(1, 100)
            .Select(_ => ConsumerAsync()).ToArray();
    }

    public Task IssueNewRequest(HttpRequestMessage request)
    {
        requests.Add(request);
        return Task.CompletedTask;
    }

    async Task ConsumerAsync()
    {
        while (true)
        {
            var request = requests.Take();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

与平行Foreach

代码语言:javascript
复制
public class ThrottlingWithParallelForEach
{
    static readonly HttpClient client = new();
    private BlockingCollection<HttpRequestMessage> requests = new();

    public ThrottlingWithParallelForEach()
    {
        _ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
    }

    public Task IssueNewRequest(HttpRequestMessage request)
    {
        requests.Add(request);
        return Task.CompletedTask;
    }
}
代码语言:javascript
复制
//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
    public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
    {
        var toBeProcessedJobs = new HashSet<Task>();
        var remainingJobsEnumerator = source.GetEnumerator();

        void AddNewJob()
        {
            if (remainingJobsEnumerator.MoveNext())
            {
                var readyToProcessJob = body(remainingJobsEnumerator.Current);
                toBeProcessedJobs.Add(readyToProcessJob);
            }
        }

        while (toBeProcessedJobs.Count < degreeOfParallelism)
        {
            AddNewJob();
        }

        while (toBeProcessedJobs.Count > 0)
        {
            Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
            toBeProcessedJobs.Remove(processed);
            AddNewJob();
        }

        return;
    }
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68266935

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档