首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用LIFO逻辑操作的MailboxProcessor

使用LIFO逻辑操作的MailboxProcessor
EN

Stack Overflow用户
提问于 2014-01-29 09:42:23
回答 3查看 685关注 0票数 5

我正在学习F#代理(MailboxProcessor)。

我正在处理一个相当非常规的问题。

  • 我有一个代理(dataSource),它是流数据的来源。数据必须由代理数组(dataProcessor)处理。我们可以将dataProcessor视为某种跟踪设备。
  • 数据可能比dataProcessor处理其输入的速度更快。
  • 耽搁一下是可以的。然而,我必须确保代理保持在其工作的顶端,并且不会在过时的观察下被堆放。

我正在探索解决这个问题的方法。

的第一个想法是在dataSource中实现堆栈 (LIFO)。当dataSource可以接收和处理数据时,dataProcessor将发送可用的最新观察结果。此解决方案可能有效,但可能会变得复杂,因为dataProcessor可能需要被阻塞和重新激活;并将其状态传递给dataSource,从而导致双向通信问题。这个问题可能归结为消费者生产者问题中的一个消费者生产者问题,但我不确定。

的第二个想法是让dataProcessor负责消息排序。在这种架构中,dataSource只需在dataProcessor的队列中发布更新。dataProcessor将使用Scan来获取队列中可用的最新数据。这可能是可行的。然而,我不确定在目前的MailboxProcessor设计中是否有可能清除消息队列,删除旧消息。此外,这里还写道:

不幸的是,TryScan函数在当前版本的F#中以两种方式中断。首先,重点是指定一个超时,但是实现实际上并没有遵守它。具体来说,不相关的消息重置计时器。其次,与其他扫描函数一样,在锁下检查消息队列,该锁防止任何其他线程在扫描期间投递,扫描时间可以是任意长的。因此,TryScan函数本身倾向于锁定并发系统,甚至可以引入死锁,因为调用者的代码是在锁内计算的(例如,从函数参数发送到扫描或TryScan时,当锁下的代码等待获得它已经在下的锁时,代理就会死锁)。

最近的观察结果反弹可能是个问题。这篇文章的作者乔恩·哈洛普认为

我设法围绕它进行架构设计,最终的体系结构实际上更好。本质上,我急切地使用自己的本地队列对所有消息进行Receive并进行筛选。

这个想法当然值得探讨,但是,在开始使用代码之前,我欢迎关于如何构造我的解决方案的一些输入。

谢谢。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2014-01-30 00:07:53

tl;I博士会尝试这样做:从FSharp.Actor或Zach的博客文章中获取邮箱实现,用ConcurrentStack替换ConcurrentQueue (加上添加一些有限制的容量逻辑),并使用这个经过更改的代理作为调度器,将消息从dataSource传递给作为普通MBPs或Actors实现的dataProcessors大军。

tl;dr2 --如果工作人员是一种稀缺而缓慢的资源,并且我们需要处理一条消息,这是工人准备就绪时的最新消息,那么所有这些都归结为一个具有堆栈的代理,而不是一个队列(具有某种有限制的容量逻辑)加上一个工作人员的BlockingQueue。Dispatcher将准备就绪的工作人员排出队列,然后从堆栈中弹出一条消息并将此消息发送给工作人员。工作完成后,工人在准备就绪时(例如,在let! msg = inbox.Receive()之前)将自己排队到队列中。然后Dispatcher使用者线程阻塞直到任何工作人员准备就绪,而生产者线程则保持有界堆栈的更新。(有界堆栈可以在锁内使用数组+偏移量+大小来完成,下面的堆栈太复杂了)

详细信息

MailBoxProcessor被设计成只有一个消费者。这甚至在MBP 这里的源代码中得到了注释(搜索单词“龙”:)

如果您将数据发布到MBP,那么只有一个线程可以从内部队列或堆栈中获取数据。在您特定的用例中,我将直接使用ConcurrentStack,或者更好地包装到BlockingCollection中。

  • 它将允许许多并发的消费者
  • 它非常快速,线程安全。
  • BlockingCollection具有BoundedCapacity属性,允许您限制集合的大小。它抛到Add上,但您可以捕获它或使用TryAdd。如果A是主堆栈,B是备用的,那么TryAdd to A,在false Add to B上将两者交换为Interlocked.Exchange,然后在A中处理所需的消息,清除它,做一个新的备用-或者使用三个堆栈,如果处理A可能超过B可能再次满;这样您不会阻塞和不丢失任何消息,但可以丢弃不需要的消息是一种受控的方式。

BlockingCollection有像AddToAny/TakeFromAny这样的方法,它们可以处理BlockingCollections数组。这可能会有所帮助,例如:

  • dataSource生成消息给具有ConcurrentStack实现的BlockingCollection (BCCS)
  • 另一个线程使用来自BCCS的消息并将它们发送到处理BCCS的数组中。你说过有很多数据。您可以牺牲一个线程来无限期地阻塞和发送您的消息。
  • 每个处理代理都有自己的BCCS或实现为代理/Actor/MBP,调度员向其发送消息。在您的示例中,只需要向一个processorAgent发送消息,因此可以将处理代理存储在循环缓冲区中,以便始终将消息发送给最近使用最少的处理器。

就像这样:

代码语言:javascript
复制
            (data stream produces 'T)
                |
            [dispatcher's BCSC]
                |
            (a dispatcher thread consumes 'T  and pushes to processors, manages capacity of BCCS and LRU queue)
                 |                               |
            [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
                 |                               |
               (process)                         (process)

与ConcurrentStack不同,您可能需要阅读有关堆数据结构的内容。如果您需要通过消息的某些属性(例如时间戳),而不是消息到达堆栈的顺序(例如,如果传输和到达顺序<>创建顺序可能出现延迟)来使用最新消息,则可以使用堆获取最新消息。

如果您仍然需要代理语义/API,那么除了Dave的链接之外,您还可以阅读多个源,并以某种方式对多个并发使用者采用实现:

  • 一个有趣的文章的扎克布雷关于有效的执行者的实现。在这里,您确实需要(在注释// Might want to schedule this call on another thread.下)将行execute true替换为行async { execute true } |> Async.Start或类似的代码,因为否则生成线程将消耗线程--对于单个快速生成器来说并不好。然而,对于上面描述的调度员来说,这正是所需要的。
  • FSharp.Actor (又名Fakka) 发展支部和FSharp MPB源代码(上面的第一个链接)对于实现细节可能非常有用。FSharp.Actors库已经冻结了几个月,但是开发分支中有一些活动。
  • 在这种情况下,不要错过Google组中的关于法卡的讨论

我有一个有点类似的用例,在过去的两天里,我研究了在F#代理/Actors上可以找到的所有东西。这个答案是我自己尝试这些想法的一种方式,其中一半是在写作过程中产生的。

票数 1
EN

Stack Overflow用户

发布于 2014-01-29 14:30:24

听起来您可能需要邮箱处理器的破坏性扫描版本,我在您可能感兴趣的博客系列中用TPL Dataflow实现了这一点。

我的博客目前已关闭,以维护,但我可以指向您的文章,以降价格式。

Part1

Part2

Part3

您还可以查看github上的代码。

我还在我潜伏的恐怖帖子中写了扫描的问题

希望能帮上忙..。

票数 2
EN

Stack Overflow用户

发布于 2014-02-04 16:10:28

最简单的解决办法是,在收到收件箱时贪婪地吃掉收件箱中的所有信息,并丢弃除最近的邮件以外的所有信息。易于使用TryReceive

代码语言:javascript
复制
let rec readLatestLoop oldMsg =
  async { let! newMsg = inbox.TryReceive 0
          match newMsg with
          | None -> oldMsg
          | Some newMsg -> return! readLatestLoop newMsg }
let readLatest() =
  async { let! msg = inbox.Receive()
          return! readLatestLoop msg }

面对同样的问题,我设计了一个更复杂、更高效的解决方案,我称之为可取消流,并在F#杂志的一篇文章这里中进行了描述。这样做的目的是开始处理消息,如果消息被取代,则取消该处理。如果正在进行重要的处理,这将大大提高并发性。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21427195

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档