背景
我们有一个服务操作,可以接收并发异步请求,并且必须一次处理这些请求。
在下面的示例中,UploadAndImport(...)方法在多个线程上接收并发请求,但它对ImportFile(...)方法的调用必须一次一个。
外行人描述
想象一个有许多工作人员(多线程)的仓库。人们(客户)可以同时(同时)发送多个包(请求)。当一个包裹进来的时候,一个工人从开始到结束都要对它负责,而放下包裹的人可以离开(失忆)。工人们的工作是把每个包裹放进一个小槽里,只有一个工人可以一次把一个包裹放进一个槽里,否则就会引起混乱。如果落包的人稍后签入(轮询端点),那么仓库应该能够报告包是否掉下了滑道。
问题
接下来的问题是如何编写服务操作.
我们尝试了以下两种方法,并想知道两件事:
我们尝试过什么?
这是我们尝试过的服务代码。它的工作,虽然它感觉有点像一个黑客或传说。
static ImportFileInfo _inProgressRequest = null;
static readonly ConcurrentDictionary<Guid, ImportFileInfo> WaitingRequests =
new ConcurrentDictionary<Guid, ImportFileInfo>();
public void UploadAndImport(ImportFileInfo request)
{
// Receive the incoming request
WaitingRequests.TryAdd(request.OperationId, request);
while (null != Interlocked.CompareExchange(ref _inProgressRequest, request, null))
{
// Wait for any previous processing to complete
Thread.Sleep(500);
}
// Process the incoming request
ImportFile(request);
Interlocked.Exchange(ref _inProgressRequest, null);
WaitingRequests.TryRemove(request.OperationId, out _);
}
public bool UploadAndImportIsComplete(Guid operationId) =>
!WaitingRequests.ContainsKey(operationId);这是示例客户端代码。
private static async Task UploadFile(FileInfo fileInfo, ImportFileInfo importFileInfo)
{
using (var proxy = new Proxy())
using (var stream = new FileStream(fileInfo.FullName, FileMode.Open, FileAccess.Read))
{
importFileInfo.FileByteStream = stream;
proxy.UploadAndImport(importFileInfo);
}
await Task.Run(() => Poller.Poll(timeoutSeconds: 90, intervalSeconds: 1, func: () =>
{
using (var proxy = new Proxy())
{
return proxy.UploadAndImportIsComplete(importFileInfo.OperationId);
}
}));
}在Fiddle中很难写出这方面的最小可行示例,但是这是一个开始给出了一种意义,并进行了编译。
和以前一样,上面的内容似乎是一种黑客/传说,我们既询问了它的方法中潜在的缺陷,也询问了更适合/规范的替代模式。
发布于 2018-05-05 05:56:58
简单的解决方案,使用生产者-消费者模式管道请求在线程计数限制。
您仍然需要实现一个简单的进度报告或事件。我建议将昂贵的轮询方法替换为由微软SignalR库提供的异步通信。它使用WebSocket来启用异步行为。客户端和服务器可以在集线器上注册它们的回调。使用RPC,客户机现在可以调用服务器端方法,反之亦然。您将通过使用集线器(客户端)向客户端发布进度。在我的经验中,SignalR是非常简单的使用和非常好的文档。它为所有著名的服务器端语言(例如Java)提供了一个库。
据我理解,民意测验完全与“火与忘”相反。你不能忘记,因为你必须根据时间间隔来检查一些东西。基于事件的通信,如SignalR,是火灾和遗忘,因为你会收到提醒(因为你忘记了)。“事件端”将调用您的回调,而不是等待您自己执行它!
需求5被忽略了,因为我没有任何理由。等待一个线程完成将消除火和忘记字符。
private BlockingCollection<ImportFileInfo> requestQueue = new BlockingCollection<ImportFileInfo>();
private bool isServiceEnabled;
private readonly int maxNumberOfThreads = 8;
private Semaphore semaphore = new Semaphore(numberOfThreads);
private readonly object syncLock = new object();
public void UploadAndImport(ImportFileInfo request)
{
// Start the request handler background loop
if (!this.isServiceEnabled)
{
this.requestQueue?.Dispose();
this.requestQueue = new BlockingCollection<ImportFileInfo>();
// Fire and forget (requirement 4)
Task.Run(() => HandleRequests());
this.isServiceEnabled = true;
}
// Cache multiple incoming client requests (requirement 1) (and enable throttling)
this.requestQueue.Add(request);
}
private void HandleRequests()
{
while (!this.requestQueue.IsCompleted)
{
// Wait while thread limit is exceeded (some throttling)
this.semaphore.WaitOne();
// Process the incoming requests in a dedicated thread (requirement 2) until the BlockingCollection is marked completed.
Task.Run(() => ProcessRequest());
}
// Reset the request handler after BlockingCollection was marked completed
this.isServiceEnabled = false;
this.requestQueue.Dispose();
}
private void ProcessRequest()
{
ImportFileInfo request = this.requestQueue.Take();
UploadFile(request);
// You updated your question saying the method "ImportFile()" requires synchronization.
// This a bottleneck and will significantly drop performance, when this method is long running.
lock (this.syncLock)
{
ImportFile(request);
}
this.semaphore.Release();
}备注:
发布于 2018-05-05 10:43:07
问题是,您的总带宽非常小--一次只能运行一个作业--而且您希望处理并行请求。这意味着排队时间可能会大不相同。实现内存中的作业队列可能不是最好的选择,因为这会使您的系统变得更加脆弱,并且在业务增长时更难以扩展。
一种传统的、可扩展的建筑师方法是:
此解决方案要求您选择web服务器。一个常见的选择是运行ASP.NET的IIS。在这个平台上,每个请求都保证以单线程的方式处理(也就是说,您不需要过多地担心争用条件),但是由于一个名为螺纹敏捷性的特性,请求可能以一个不同的线程结束,但在原始的同步上下文中,这意味着除非您正在调试和检查线程ID,否则您可能永远不会注意到。
发布于 2018-05-07 17:12:49
考虑到系统的约束上下文,这是我们最后使用的实现:
static ImportFileInfo _importInProgressItem = null;
static readonly ConcurrentQueue<ImportFileInfo> ImportQueue =
new ConcurrentQueue<ImportFileInfo>();
public void UploadAndImport(ImportFileInfo request) {
UploadFile(request);
ImportFileSynchronized(request);
}
// Synchronize the file import,
// because the database allows a user to perform only one write at a time.
private void ImportFileSynchronized(ImportFileInfo request) {
ImportQueue.Enqueue(request);
do {
ImportQueue.TryPeek(out var next);
if (null != Interlocked.CompareExchange(ref _importInProgressItem, next, null)) {
// Queue processing is already under way in another thread.
return;
}
ImportFile(next);
ImportQueue.TryDequeue(out _);
Interlocked.Exchange(ref _importInProgressItem, null);
}
while (ImportQueue.Any());
}
public bool UploadAndImportIsComplete(Guid operationId) =>
ImportQueue.All(waiting => waiting.OperationId != operationId);这个解决方案对于我们期望的负载非常有效。该加载最多涉及15-20个并行PDF文件上载。多达15-20个文件的批次往往同时到达,然后安静了几个小时,直到下一批文件到达。
批评和反馈是最受欢迎的。
https://stackoverflow.com/questions/50184497
复制相似问题