我有很多超时异常使用Polly舱壁策略,这个策略帮助我限制我发送到特定主机的并发调用的数量。但是,HttpClient超时时间似乎会影响整个委托。
我使用IHttpClientFactory来配置以下代码:
services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));
private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
return Policy.BulkheadAsync(maxConcurrentRequests, int.MaxValue)
.AsAsyncPolicy<HttpResponseMessage>();
}我的问题是,我希望超时只影响请求本身,而不影响舱壁策略,因为我想实现的行为如下:
exception)
)
我已经使用Semaphore而不是舱壁Polly策略来实现这种行为,但是我想使用策略封装该代码。
谢谢。
发布于 2021-07-12 15:49:58
我将这些示例放在一起,以演示如何对HttpClient请求执行节流操作的不同选项。我要强调的是,这些只是例子,与生产代码相去甚远,所以请通过那个玻璃仔细检查它们。
下面的示例代码展示了如何在火灾中发出请求,并忘记了(这样他们就不关心响应了)。这些解决方案假定请求比可用吞吐量多。换句话说,生产者比消费者更快,这就是为什么有某种排队机制来处理这种不平衡。
带Back和Action块
public class ThrottlingWithBatchBlock
{
static readonly HttpClient client = new();
private readonly BatchBlock<HttpRequestMessage> requests = new(100);
private ActionBlock<HttpRequestMessage[]> consumer;
public ThrottlingWithBatchBlock()
{
consumer = new(
reqs => ConsumerAsync(reqs),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
requests.LinkTo(consumer);
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
private async Task ConsumerAsync(HttpRequestMessage[] requests)
{
foreach (var request in requests)
await client.SendAsync(request).ConfigureAwait(false);
}
}带缓冲块
public class ThrottlingWithBufferBlock
{
static readonly HttpClient client = new();
private readonly BufferBlock<HttpRequestMessage> requests = new(
new DataflowBlockOptions { BoundedCapacity = 100 });
public ThrottlingWithBufferBlock()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.OutputAvailableAsync())
{
var request = await requests.ReceiveAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}有通道
public class ThrottlingWithChannels
{
static readonly HttpClient client = new();
private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });
public ThrottlingWithChannels()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.Writer.WaitToWriteAsync();
await requests.Writer.WriteAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.Reader.WaitToReadAsync())
{
var request = await requests.Reader.ReadAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}带阻塞集
public class ThrottlingWithBlockingCollection
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithBlockingCollection()
{
_ = Enumerable.Range(1, 100)
.Select(_ => ConsumerAsync()).ToArray();
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
async Task ConsumerAsync()
{
while (true)
{
var request = requests.Take();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}与平行Foreach
public class ThrottlingWithParallelForEach
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithParallelForEach()
{
_ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
}//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
{
var toBeProcessedJobs = new HashSet<Task>();
var remainingJobsEnumerator = source.GetEnumerator();
void AddNewJob()
{
if (remainingJobsEnumerator.MoveNext())
{
var readyToProcessJob = body(remainingJobsEnumerator.Current);
toBeProcessedJobs.Add(readyToProcessJob);
}
}
while (toBeProcessedJobs.Count < degreeOfParallelism)
{
AddNewJob();
}
while (toBeProcessedJobs.Count > 0)
{
Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
toBeProcessedJobs.Remove(processed);
AddNewJob();
}
return;
}
}https://stackoverflow.com/questions/68266935
复制相似问题