目前,我正在尝试使用处理一些长时间运行的任务(作业)的RESTful应用程序接口来构建一个web服务。
其想法是,用户通过执行POST提交作业,该POST返回一些用于检查作业状态的URL,其中还包含结果的url。一旦作业完成(例如,将一些值写入数据库),结果URL将返回适当的信息(而不是无结果),并且作业url将指示已完成状态。
不幸的是,计算非常密集,因此一次只能运行一个,因此作业需要排队。
如果是伪的,就需要这样的东西
(def job-queue (atom queue)) ;; some queue
(def jobs (atom {}))
(defn schedule-job [params]
;; schedules the job into the queue and
;; adds the job to a jobs map for checking status via GET
;; note that the job should not be evaluated until popped from the queue
)
(POST "/analyze" [{params :params}]
(schedulde-job params))
(GET "job/:id" [:d]
(get @jobs id))
;; Some function that pops the next item from the queue
;; and evaluates it when the previous item is complete
;; Note: should not terminate when queue is empty! 我研究过允许异步处理的Lamina,但它似乎不适合我的需求。
我的问题是如何出列作业队列,并在前一个任务完成后执行其任务,而不会在队列为空时终止,即永久处理传入的作业。
发布于 2013-02-04 04:12:11
java.util.concurrent.ExecutorService可能就是你想要的。这允许您提交一个作业以供以后执行,并返回一个Future,您可以查询该作业是否已完成。
(import '[java.util.concurrent Callable Executors])
(def job-executor
(Executors/newSingleThreadExecutor))
(def jobs (atom {}))
(defn submit-job [func]
(let [job-id (str (java.util.UUID/randomUUID))
callable (reify Callable (call [_] (func))]
(swap! jobs assoc job-id (.submit job-executor callable))
job-id))
(use 'compojure.core)
(defroutes app
(POST "/jobs" [& params]
(let [id (submit-job #(analyze params))]
{:status 201 :headers {"Location" (str "/jobs/" id)}}))
(GET "/jobs/:id" [id]
(let [job-future (@jobs id)]
(if (.isDone job-future)
(.get job-future)
{:status 404}))))发布于 2013-02-04 00:52:35
这似乎在做我所期望的事情,但它看起来确实不太地道。有谁对如何改进这一点有想法吗?
;; Create a unique identifier
(defn uuid [] (str (java.util.UUID/randomUUID)))
;; Create a job-queue and a map for keeping track of the status
(def job-queue (ref clojure.lang.PersistentQueue/EMPTY))
(def jobs (atom {}))
(defn dequeue! [queue-ref]
;; Pops the first element off the queue-ref
(dosync
(let [item (peek @queue-ref)]
(alter queue-ref pop)
item)))
(defn schedule-job! [task]
;; Schedule a task to be executed, expects a function (task) to be evaluated
(let [uuid (uuid)
job (delay task)]
(dosync
(swap! jobs assoc uuid job)
(alter job-queue conj job))))
(defn run-jobs []
;; Runs the jobs
(while true
(Thread/sleep 10)
(let [curr (dequeue! job-queue)]
(if-not (nil? curr) (@curr)))))
(.start (Thread. run-jobs))发布于 2013-02-04 01:26:44
您的描述看起来像是一个多生产者和单消费者的场景。下面是一个示例代码(您可以将其与REST内容以及一些可能的异常处理相结合,这样代理就不会死)
(def worker (agent {}))
(defn do-task [name func]
(send worker
(fn [results]
(let [r (func)]
(assoc results name r)))))
;submit tasks
(do-task "uuid1" #(print 10))
(do-task "uuid2" #(+ 1 1))
;get all results
(print @worker) https://stackoverflow.com/questions/14673108
复制相似问题