首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >接收并发异步请求并一次处理它们

接收并发异步请求并一次处理它们
EN

Stack Overflow用户
提问于 2018-05-05 00:09:37
回答 3查看 798关注 0票数 2

背景

我们有一个服务操作,可以接收并发异步请求,并且必须一次处理这些请求。

在下面的示例中,UploadAndImport(...)方法在多个线程上接收并发请求,但它对ImportFile(...)方法的调用必须一次一个。

外行人描述

想象一个有许多工作人员(多线程)的仓库。人们(客户)可以同时(同时)发送多个包(请求)。当一个包裹进来的时候,一个工人从开始到结束都要对它负责,而放下包裹的人可以离开(失忆)。工人们的工作是把每个包裹放进一个小槽里,只有一个工人可以一次把一个包裹放进一个槽里,否则就会引起混乱。如果落包的人稍后签入(轮询端点),那么仓库应该能够报告包是否掉下了滑道。

问题

接下来的问题是如何编写服务操作.

  1. 可以接收并发客户端请求,
  2. 在多个线程上接收和处理这些请求,
  3. 在接收请求的同一线程上处理请求,
  4. 每次处理一个请求,
  5. 是一次单向的火灾和遗忘行动,而且
  6. 具有一个单独的轮询终结点,该端点在请求完成时报告。

我们尝试了以下两种方法,并想知道两件事:

  1. 是否有我们没有考虑过的比赛条件?
  2. 是否有更规范的方法在C#.NET中使用面向服务的体系结构(我们碰巧使用WCF)来编写这个场景?

我们尝试过什么?

这是我们尝试过的服务代码。它的工作,虽然它感觉有点像一个黑客或传说。

代码语言:javascript
复制
static ImportFileInfo _inProgressRequest = null;

static readonly ConcurrentDictionary<Guid, ImportFileInfo> WaitingRequests = 
    new ConcurrentDictionary<Guid, ImportFileInfo>();

public void UploadAndImport(ImportFileInfo request)
{
    // Receive the incoming request
    WaitingRequests.TryAdd(request.OperationId, request);

    while (null != Interlocked.CompareExchange(ref _inProgressRequest, request, null))
    {
        // Wait for any previous processing to complete
        Thread.Sleep(500);
    }

    // Process the incoming request
    ImportFile(request);

    Interlocked.Exchange(ref _inProgressRequest, null);
    WaitingRequests.TryRemove(request.OperationId, out _);
}

public bool UploadAndImportIsComplete(Guid operationId) => 
    !WaitingRequests.ContainsKey(operationId);

这是示例客户端代码。

代码语言:javascript
复制
private static async Task UploadFile(FileInfo fileInfo, ImportFileInfo importFileInfo)
{
    using (var proxy = new Proxy())
    using (var stream = new FileStream(fileInfo.FullName, FileMode.Open, FileAccess.Read))
    {
        importFileInfo.FileByteStream = stream;
        proxy.UploadAndImport(importFileInfo);
    }

    await Task.Run(() => Poller.Poll(timeoutSeconds: 90, intervalSeconds: 1, func: () =>
    {
        using (var proxy = new Proxy())
        {
            return proxy.UploadAndImportIsComplete(importFileInfo.OperationId);
        }
    }));
}

在Fiddle中很难写出这方面的最小可行示例,但是这是一个开始给出了一种意义,并进行了编译。

和以前一样,上面的内容似乎是一种黑客/传说,我们既询问了它的方法中潜在的缺陷,也询问了更适合/规范的替代模式。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-05-05 05:56:58

简单的解决方案,使用生产者-消费者模式管道请求在线程计数限制。

您仍然需要实现一个简单的进度报告或事件。我建议将昂贵的轮询方法替换为由微软SignalR库提供的异步通信。它使用WebSocket来启用异步行为。客户端和服务器可以在集线器上注册它们的回调。使用RPC,客户机现在可以调用服务器端方法,反之亦然。您将通过使用集线器(客户端)向客户端发布进度。在我的经验中,SignalR是非常简单的使用和非常好的文档。它为所有著名的服务器端语言(例如Java)提供了一个库。

据我理解,民意测验完全与“火与忘”相反。你不能忘记,因为你必须根据时间间隔来检查一些东西。基于事件的通信,如SignalR,是火灾和遗忘,因为你会收到提醒(因为你忘记了)。“事件端”将调用您的回调,而不是等待您自己执行它!

需求5被忽略了,因为我没有任何理由。等待一个线程完成将消除火和忘记字符。

代码语言:javascript
复制
private BlockingCollection<ImportFileInfo> requestQueue = new BlockingCollection<ImportFileInfo>();
private bool isServiceEnabled;
private readonly int maxNumberOfThreads = 8;
private Semaphore semaphore = new Semaphore(numberOfThreads);
private readonly object syncLock = new object();

public void UploadAndImport(ImportFileInfo request) 
{            
  // Start the request handler background loop
  if (!this.isServiceEnabled)
  {
    this.requestQueue?.Dispose();
    this.requestQueue = new BlockingCollection<ImportFileInfo>();

    // Fire and forget (requirement 4)
    Task.Run(() => HandleRequests());
    this.isServiceEnabled = true;
  }

  // Cache multiple incoming client requests (requirement 1) (and enable throttling)
  this.requestQueue.Add(request);
}

private void HandleRequests()
{
  while (!this.requestQueue.IsCompleted)
  {
    // Wait while thread limit is exceeded (some throttling)
    this.semaphore.WaitOne();

    // Process the incoming requests in a dedicated thread (requirement 2) until the BlockingCollection is marked completed.
    Task.Run(() => ProcessRequest());
  }

  // Reset the request handler after BlockingCollection was marked completed
  this.isServiceEnabled = false;
  this.requestQueue.Dispose();
}

private void ProcessRequest()
{
  ImportFileInfo request = this.requestQueue.Take();
  UploadFile(request);

  // You updated your question saying the method "ImportFile()" requires synchronization.
  // This a bottleneck and will significantly drop performance, when this method is long running. 
  lock (this.syncLock)
  {
    ImportFile(request);
   }

  this.semaphore.Release();
}

备注:

  • BlockingCollection是IDisposable
  • TODO:您必须“关闭”BlockingCollection,将其标记为"BlockingCollection.CompleteAdding()“,否则它将不确定地循环等待进一步的请求。也许您为客户端引入了一个额外的请求方法,用于取消和/或更新流程,并将添加到BlockingCollection的标记标记为已完成。或定时器在标记为已完成之前等待空闲时间。或者使请求处理程序线程阻塞或旋转。
  • 替换接受()并添加(.)与TryTake(.)和TryAdd(.)如果您想要取消支持
  • 代码未被测试
  • 您的"ImportFile()“方法是多线程环境中的瓶颈。我建议让它安全。在需要同步的I/O情况下,我会将数据缓存在一个BlockingCollection中,然后一个一个地将它们写入I/O。
票数 2
EN

Stack Overflow用户

发布于 2018-05-05 10:43:07

问题是,您的总带宽非常小--一次只能运行一个作业--而且您希望处理并行请求。这意味着排队时间可能会大不相同。实现内存中的作业队列可能不是最好的选择,因为这会使您的系统变得更加脆弱,并且在业务增长时更难以扩展。

一种传统的、可扩展的建筑师方法是:

  • 接受请求的HTTP服务,负载平衡/冗余,没有会话状态。
  • 一个SQL Server数据库,用于将请求持久化到队列中,并返回持久的唯一作业ID。
  • 用于处理队列的Windows服务,每次只处理一个作业,并将作业标记为已完成。服务的工作进程可能是单线程的。

此解决方案要求您选择web服务器。一个常见的选择是运行ASP.NET的IIS。在这个平台上,每个请求都保证以单线程的方式处理(也就是说,您不需要过多地担心争用条件),但是由于一个名为螺纹敏捷性的特性,请求可能以一个不同的线程结束,但在原始的同步上下文中,这意味着除非您正在调试和检查线程ID,否则您可能永远不会注意到。

票数 1
EN

Stack Overflow用户

发布于 2018-05-07 17:12:49

考虑到系统的约束上下文,这是我们最后使用的实现:

代码语言:javascript
复制
static ImportFileInfo _importInProgressItem = null;

static readonly ConcurrentQueue<ImportFileInfo> ImportQueue = 
    new ConcurrentQueue<ImportFileInfo>();

public void UploadAndImport(ImportFileInfo request) {
    UploadFile(request);
    ImportFileSynchronized(request);
}

// Synchronize the file import, 
// because the database allows a user to perform only one write at a time.
private void ImportFileSynchronized(ImportFileInfo request) {
    ImportQueue.Enqueue(request);
    do {
        ImportQueue.TryPeek(out var next);
        if (null != Interlocked.CompareExchange(ref _importInProgressItem, next, null)) {
            // Queue processing is already under way in another thread.
            return;
        }

        ImportFile(next);
        ImportQueue.TryDequeue(out _);
        Interlocked.Exchange(ref _importInProgressItem, null);
    }
    while (ImportQueue.Any());
}

public bool UploadAndImportIsComplete(Guid operationId) =>
    ImportQueue.All(waiting => waiting.OperationId != operationId);

这个解决方案对于我们期望的负载非常有效。该加载最多涉及15-20个并行PDF文件上载。多达15-20个文件的批次往往同时到达,然后安静了几个小时,直到下一批文件到达。

批评和反馈是最受欢迎的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50184497

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档