我正在学习F#代理(MailboxProcessor)。
我正在处理一个相当非常规的问题。
dataSource),它是流数据的来源。数据必须由代理数组(dataProcessor)处理。我们可以将dataProcessor视为某种跟踪设备。dataProcessor处理其输入的速度更快。我正在探索解决这个问题的方法。
的第一个想法是在dataSource中实现堆栈 (LIFO)。当dataSource可以接收和处理数据时,dataProcessor将发送可用的最新观察结果。此解决方案可能有效,但可能会变得复杂,因为dataProcessor可能需要被阻塞和重新激活;并将其状态传递给dataSource,从而导致双向通信问题。这个问题可能归结为消费者生产者问题中的一个消费者生产者问题,但我不确定。
的第二个想法是让dataProcessor负责消息排序。在这种架构中,dataSource只需在dataProcessor的队列中发布更新。dataProcessor将使用Scan来获取队列中可用的最新数据。这可能是可行的。然而,我不确定在目前的MailboxProcessor设计中是否有可能清除消息队列,删除旧消息。此外,这里还写道:
不幸的是,TryScan函数在当前版本的F#中以两种方式中断。首先,重点是指定一个超时,但是实现实际上并没有遵守它。具体来说,不相关的消息重置计时器。其次,与其他扫描函数一样,在锁下检查消息队列,该锁防止任何其他线程在扫描期间投递,扫描时间可以是任意长的。因此,TryScan函数本身倾向于锁定并发系统,甚至可以引入死锁,因为调用者的代码是在锁内计算的(例如,从函数参数发送到扫描或TryScan时,当锁下的代码等待获得它已经在下的锁时,代理就会死锁)。
最近的观察结果反弹可能是个问题。这篇文章的作者乔恩·哈洛普认为
我设法围绕它进行架构设计,最终的体系结构实际上更好。本质上,我急切地使用自己的本地队列对所有消息进行
Receive并进行筛选。
这个想法当然值得探讨,但是,在开始使用代码之前,我欢迎关于如何构造我的解决方案的一些输入。
谢谢。
发布于 2014-01-30 00:07:53
tl;I博士会尝试这样做:从FSharp.Actor或Zach的博客文章中获取邮箱实现,用ConcurrentStack替换ConcurrentQueue (加上添加一些有限制的容量逻辑),并使用这个经过更改的代理作为调度器,将消息从dataSource传递给作为普通MBPs或Actors实现的dataProcessors大军。
tl;dr2 --如果工作人员是一种稀缺而缓慢的资源,并且我们需要处理一条消息,这是工人准备就绪时的最新消息,那么所有这些都归结为一个具有堆栈的代理,而不是一个队列(具有某种有限制的容量逻辑)加上一个工作人员的BlockingQueue。Dispatcher将准备就绪的工作人员排出队列,然后从堆栈中弹出一条消息并将此消息发送给工作人员。工作完成后,工人在准备就绪时(例如,在let! msg = inbox.Receive()之前)将自己排队到队列中。然后Dispatcher使用者线程阻塞直到任何工作人员准备就绪,而生产者线程则保持有界堆栈的更新。(有界堆栈可以在锁内使用数组+偏移量+大小来完成,下面的堆栈太复杂了)
详细信息
MailBoxProcessor被设计成只有一个消费者。这甚至在MBP 这里的源代码中得到了注释(搜索单词“龙”:)
如果您将数据发布到MBP,那么只有一个线程可以从内部队列或堆栈中获取数据。在您特定的用例中,我将直接使用ConcurrentStack,或者更好地包装到BlockingCollection中。
BlockingCollection具有BoundedCapacity属性,允许您限制集合的大小。它抛到Add上,但您可以捕获它或使用TryAdd。如果A是主堆栈,B是备用的,那么TryAdd to A,在false Add to B上将两者交换为Interlocked.Exchange,然后在A中处理所需的消息,清除它,做一个新的备用-或者使用三个堆栈,如果处理A可能超过B可能再次满;这样您不会阻塞和不丢失任何消息,但可以丢弃不需要的消息是一种受控的方式。BlockingCollection有像AddToAny/TakeFromAny这样的方法,它们可以处理BlockingCollections数组。这可能会有所帮助,例如:
就像这样:
(data stream produces 'T)
|
[dispatcher's BCSC]
|
(a dispatcher thread consumes 'T and pushes to processors, manages capacity of BCCS and LRU queue)
| |
[processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
| |
(process) (process)与ConcurrentStack不同,您可能需要阅读有关堆数据结构的内容。如果您需要通过消息的某些属性(例如时间戳),而不是消息到达堆栈的顺序(例如,如果传输和到达顺序<>创建顺序可能出现延迟)来使用最新消息,则可以使用堆获取最新消息。
如果您仍然需要代理语义/API,那么除了Dave的链接之外,您还可以阅读多个源,并以某种方式对多个并发使用者采用实现:
// Might want to schedule this call on another thread.下)将行execute true替换为行async { execute true } |> Async.Start或类似的代码,因为否则生成线程将消耗线程--对于单个快速生成器来说并不好。然而,对于上面描述的调度员来说,这正是所需要的。Fakka) 发展支部和FSharp MPB源代码(上面的第一个链接)对于实现细节可能非常有用。FSharp.Actors库已经冻结了几个月,但是开发分支中有一些活动。我有一个有点类似的用例,在过去的两天里,我研究了在F#代理/Actors上可以找到的所有东西。这个答案是我自己尝试这些想法的一种方式,其中一半是在写作过程中产生的。
发布于 2014-01-29 14:30:24
发布于 2014-02-04 16:10:28
最简单的解决办法是,在收到收件箱时贪婪地吃掉收件箱中的所有信息,并丢弃除最近的邮件以外的所有信息。易于使用TryReceive
let rec readLatestLoop oldMsg =
async { let! newMsg = inbox.TryReceive 0
match newMsg with
| None -> oldMsg
| Some newMsg -> return! readLatestLoop newMsg }
let readLatest() =
async { let! msg = inbox.Receive()
return! readLatestLoop msg }面对同样的问题,我设计了一个更复杂、更高效的解决方案,我称之为可取消流,并在F#杂志的一篇文章这里中进行了描述。这样做的目的是开始处理消息,如果消息被取代,则取消该处理。如果正在进行重要的处理,这将大大提高并发性。
https://stackoverflow.com/questions/21427195
复制相似问题