首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有资格的生产者消费者

有资格的生产者消费者
EN

Stack Overflow用户
提问于 2010-05-03 17:46:45
回答 1查看 3.3K关注 0票数 21

我是clojure的新手,我正试图理解如何正确地使用它的并发特性,因此,任何批评/建议都会受到赞赏。因此,我试图用clojure编写一个小的测试程序,其工作方式如下:

  1. 有5个生产商和2个消费者
  2. 生产者等待一个随机时间,然后将一个数字推入共享队列。
  3. 当队列不为空时,使用者应该从队列中取出一个数字,然后睡眠一小段时间来模拟工作。
  4. 当队列为空时,使用者应阻塞。
  5. 当队列中有超过4项时,生产者应该阻塞,以防止队列变得巨大。

以下是我对上述每一步的计划:

  1. 生产者和消费者将是不关心他们的状态的代理(只是零值或什么的);我只是使用代理在某个时候发送一个“消费者”或“生产者”函数。然后共享队列将是(def queue (ref []))。也许这应该是个原子?

  1. 在“生产者”代理函数中,简单地(线程/睡眠(rand-int 1000))和(dosync (alter conj (rand-int 100)推入队列。
  2. 我正在考虑让消费者代理使用添加监视器监视队列中的更改。不确定这个though..it是否会在任何变化中唤醒消费者,即使更改来自于一个消费者在做一些事情(可能使其为空)。也许在观察者函数中检查这一点就足够了。我看到的另一个问题是,如果所有的消费者都很忙,那么当生产者在队列中添加新的东西时会发生什么呢?被监视的事件是在某个消费者代理上排队还是消失了?
  3. 见上文
  4. 我真的不知道该怎么做。我听说clojure的seque可能很有用,但我找不到足够的文档来说明如何使用它,而且我的最初测试似乎不起作用(对不起,我已经没有代码了)

EN

回答 1

Stack Overflow用户

发布于 2010-05-03 18:22:42

这是我对它的看法。我只想使用Clojure数据结构来了解它是如何工作的。请注意,从Java工具箱中获取阻塞队列并在这里使用是非常常见和惯用的;我认为代码很容易修改。更新:实际上,我确实将其改编为java.util.concurrent.LinkedBlockingQueue,见下文。

clojure.lang.PersistentQueue

调用(pro-con)启动测试运行;然后查看output的内容,看看是否发生了任何事情,queue-lengths查看它们是否停留在给定的范围内。

更新:为了解释为什么我觉得需要在下面使用ensure (我在IRC上被问到这个问题),这是为了防止写错误(请参阅维基百科关于快照隔离的文章中的定义)。如果我用@queue代替(ensure queue),那么两个或多个生产者可能会检查队列的长度,发现队列长度小于4,然后在队列中放置额外的项,并可能使队列的总长度超过4,从而打破约束。类似地,两个执行@queue的消费者可以接受相同的项进行处理,然后从队列中弹出两个项。ensure防止这两种情况中的任何一种发生。

代码语言:javascript
复制
(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn queue-length-watch [_ _ _ new-queue-state]
  (dosync (alter queue-lengths conj (count new-queue-state))))

(add-watch queue :queue-length-watch queue-length-watch)

(defn producer [tag]
  (future
   (while @go-on?
     (if (dosync (let [l (count (ensure queue))]
                   (when (< l *max-queue-length*)
                     (alter queue conj tag)
                     true)))
       (Thread/sleep (rand-int 2000))))))

(defn consumer []
  (future
   (while @go-on?
     (Thread/sleep 100)       ; don't look at the queue too often
     (when-let [item (dosync (let [item (first (ensure queue))]
                               (alter queue pop)
                               item))]
       (Thread/sleep (rand-int 500))         ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))

java.util.concurrent.LinkedBlockingQueue

使用LinkedBlockingQueue编写的上述版本。注意代码的总体轮廓是如何基本相同的,有些细节实际上稍微清晰一些。我从这个版本中删除了queue-lengths,因为LBQ为我们处理了这个约束。

代码语言:javascript
复制
(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn producer [tag]
  (future
   (while @go-on?
     (.put queue tag)
     (Thread/sleep (rand-int 2000)))))

(defn consumer []
  (future
   (while @go-on?
     ;; I'm using .poll on the next line so as not to block
     ;; indefinitely if we're done; note that this has the
     ;; side effect that nulls = nils on the queue will not
     ;; be handled; there's a number of other ways to go about
     ;; this if this is a problem, see docs on LinkedBlockingQueue
     (when-let [item (.poll queue)]
       (Thread/sleep (rand-int 500)) ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))
票数 25
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/2760017

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档