我目前正在试图为以下问题找到一个lispy/工作解决方案:
作业队列提供一组相等的线程(通过它们的代码),其中包含它们应该处理的任务。如果队列是空的,则线程将等待新条目的生成,但我也希望提供一个干净的关机。因此,即使在等待队列时,母线程也必须设置一些变量/调用线程并让它们关闭。他们不会直接遵守的唯一原因应该是线程目前正在评估任务,因此在任务完成之前,线程是繁忙/无法完全关闭的。
我目前有两种解决方案,我不太相信:
(defparameter *kill-yourself* nil)
(defparameter *mutex* (sb-thread:make-mutex))
(defparameter *notify* (sb-thread:make-waitqueue))
#|the queue is thread safe|#
(defparameter *job-queue* (make-instance 'queue))
(defun fill-queue (with-data)
(fill-stuff-in-queue)
(sb-thread:with-mutex (*mutex*)
(sb-thread:condition-notify *notify*)))
#|solution A|#
(with-mutex (*mutex*)
(do ((curr-job nil))
(*kill-yourself* nil)
(if (is-empty *job-queue*)
(sb-thread:condition-wait *notify* *mutex*)
(progn
(setf curr-job (dequeue *job-queue*))
(do-stuff-with-job)))))
#|solution B|#
(defun helper-kill-yourself-p ()
(sb-thread:with-mutex (*mutex*)
*kill-yourself*))
(do ((job (dequeue-* *job-queue* :timeout 0)
(dequeue-* *job-queue* :timeout 0)))
((if (helper-kill-yourself-p)
t
(sb-thread:with-mutex (*mutex*)
(sb-thread:condition-wait *notify* *mutex*)
(if (helper-kill-yourself-p)
t
nil)))
(progn
nil))
(do-stuff-with-job))这两个do循环都可以用来启动线程。但是,如果有多个线程(因为互斥将阻止任何并行操作发生),那么A实际上就无法工作,而且B解决方案看起来/很脏,因为有可能在提取的作业为零的情况下出现侧倾。此外,我并不是真的相信停下来的条件,因为它太长,似乎复杂。
实现(Do)循环的正确方法是什么?只要队列提供的数据正常工作,只要没有新的数据,只要没有新的数据,只要它不应该关闭,它也能休眠吗?最后但并非最不重要的一点是,必须能够在不限数量的多个并行线程中使用这个(Do)循环。
发布于 2013-09-09 18:30:50
解A
是的,解决方案A是对的,互斥锁不会让作业并行执行。
解决方案B
我认为do循环不是适合这项工作的工具。特别是,在您的代码中,有可能将作业从队列中提取出来,并且线程将在不执行它的情况下退出。这种情况是可能的,因为您在应该-终止-检查之前排成了队列。另外,由于在job的变量中定义了do,所以忽略了从dequeue返回的多个值,这也很糟糕,因为您无法有效地检查队列是否为空。此外,在检查线程是否应该在do结束测试表单中停止的情况下,您必须获得两次*mutex*,以检查线程是否应该停止并去队列(或者您可以发明奇怪的结束测试表单来完成循环主体的工作)。
因此,尽管如此,您必须将所有代码放入do的体内,并将vars和结束测试保持为空,这就是为什么我认为loop在这种情况下更好的原因。
如果您必须使用do循环,您可以轻松地将loop主体封装到其中,例如(do nil (nil nil) *loop-body*)。
我的解决方案
(require :sb-concurrency)
(use-package :sb-concurrency)
(use-package :sb-thread)
(defparameter *kill-yourself* nil)
(defparameter *mutex* (make-mutex))
(defparameter *notify* (make-waitqueue))
#|the queue is thread safe|#
(defparameter *job-queue* (make-queue :name "job-queue"))
(defparameter *timeout* 10)
(defparameter *output-lock* (make-mutex))
(defun output (line)
(with-mutex (*output-lock*)
(write-line line)))
(defun fill-queue (with-data)
(enqueue with-data *job-queue*)
(with-mutex (*mutex*)
(condition-notify *notify*)))
(defun process-job (thread-name job)
(funcall job thread-name))
(defun run-worker (name)
(make-thread
(lambda ()
(output (format nil "starting thread ~a" name))
(loop (with-mutex (*mutex*)
(condition-wait *notify* *mutex* :timeout *timeout*)
(when *kill-yourself*
(output (format nil "~a thread quitting" name))
(return-from-thread nil)))
;; release *mutex* before starting the job,
;; otherwise it won't allow other threads wait for new jobs
;; you don't want to make 2 separate calls (queue-empty-p, dequeue)
;; since inbetween queue can become empty
(multiple-value-bind (job has-job) (dequeue *job-queue*)
(if has-job
(process-job name job)))))
:name name))
(defun stop-work ()
(with-mutex (*mutex*)
(setf *kill-yourself* t)
(condition-broadcast *notify*)))
(defun add-job (job)
;; no need to put enqueue in critical section
(enqueue job *job-queue*)
(with-mutex (*mutex*)
(condition-notify *notify*)))
(defun make-job (n)
(lambda (thread-name)
(loop for i upto 1000 collecting i)
(output (format nil "~a thread executes ~a job" thread-name n))))
(defun try-me ()
(run-worker "worker1")
(run-worker "worker2")
(loop for i upto 1000 do
(add-job (make-job i)))
(loop for i upto 2000 collecting i)
(stop-work))在REPL中调用try-me应该提供如下输出
starting thread worker1
worker1 thread executes 0 job
worker1 thread executes 1 job
worker1 thread executes 2 job
worker1 thread executes 3 job
starting thread worker2
worker2 thread executes 4 job
worker1 thread executes 5 job
worker2 thread executes 6 job
worker1 thread executes 7 job
worker1 thread executes 8 job
...
worker2 thread executes 33 job
worker1 thread executes 34 job
worker2 thread executes 35 job
worker1 thread executes 36 job
worker1 thread executes 37 job
worker2 thread executes 38 job
0
worker1 thread executes 39 job
worker2 thread quitting
worker1 thread quitting我找不到旧SBCL的文档,所以我把翻译留给您。希望能帮上忙。
编辑类解决方案
在对您的(已删除的)答案的注释中,我们发现您希望为事件循环提供一个类。我想出了以下几点
(defclass event-loop ()
((lock
:initform (make-mutex))
(queue
:initform (make-waitqueue))
(jobs
:initform (make-queue))
(stopped
:initform nil)
(timeout
:initarg :wait-timeout
:initform 0)
(process-job
:initarg :process-job
:initform #'identity)
(worker-count
:initarg :worker-count
:initform (error "Must supply worker count"))))
(defmethod initialize-instance :after ((eloop event-loop) &key)
(with-slots (worker-count timeout lock queue jobs process-job stopped) eloop
(dotimes (i worker-count)
(make-thread
(lambda ()
(loop (with-mutex (lock)
(condition-wait queue lock :timeout timeout)
(when stopped
(return-from-thread nil)))
;; release *mutex* before starting the job,
;; otherwise it won't allow other threads wait for new jobs
;; you don't want to make 2 separate calls (queue-empty-p, dequeue)
;; since inbetween queue can become empty
(multiple-value-bind (job has-job) (dequeue jobs)
(if has-job
(funcall process-job job)))))))))
(defun push-job (job event-loop )
(with-slots (lock queue jobs) event-loop
(enqueue job jobs)
(with-mutex (lock)
(condition-notify queue))))
(defun stop-loop (event-loop)
(with-slots (lock queue stopped) event-loop
(with-mutex (lock)
(setf stopped t)
(condition-broadcast queue))))你可以这样用它
> (defparameter *el* (make-instance 'event-loop :worker-count 10 :process-job #'funcall))
> (defparameter *oq* (make-queue))
> (dotimes (i 100)
(push-job (let ((n i)) (lambda ()
(sleep 1)
(enqueue (format nil "~a job done" n) *oq*))) *el*))它使用sb-thread:queue作为输出,以避免奇怪的结果。当这起作用时,您可以在REPL中检查*oq*。
> *oq*
#S(QUEUE
:HEAD (SB-CONCURRENCY::.DUMMY. "7 job done" "1 job done" "9 job done"
"6 job done" "2 job done" "11 job done" "10 job done" "16 job done"
"12 job done" "4 job done" "3 job done" "17 job done" "5 job done"
"0 job done" "8 job done" "14 job done" "25 job done" "15 job done"
"21 job done" "28 job done" "13 job done" "23 job done" "22 job done"
"19 job done" "27 job done" "18 job done")
:TAIL ("18 job done")
:NAME NIL)发布于 2013-09-09 11:06:38
我使用了库chanl,它提供了消息队列机制。当我希望线程关闭时,我只需将关键字:stop发送到队列中。当然,这并不是在队列中的:stop之前完成所有事情之前停止的。如果希望更早停止,可以在数据队列之前创建另一个队列(控制队列)。
https://stackoverflow.com/questions/18689986
复制相似问题