为了做到这一点,我正在考虑保持一个映射。这里有一个粗略的解决方案,您可以在REPL中使用:
(ns play
(:require [clojure.core.async :as a :refer [go <! go-loop >!]]))
(def recordings (atom {}))
(defn record-subquery! [client-id query-n subquery-n]
(swap! recordings update subquery-n
(fn [prev]
(let [prev (or prev #{})]
(conj prev [client-id query-n])))))
(defn go-subquery [client-id query-n subquery-n]
(go
(<! (a/timeout (rand-int 2000)))
(record-subquery! client-id query-n subquery-n)
{:client-id client-id
:query-n query-n
:subquery-n subquery-n}))
(defn go-query [client-id query-n]
(go
(let [subquery-ns (range query-n (+ query-n 5))]
{:client-id client-id
:query-n query-n
:subqueries (->> subquery-ns
(map (partial go-subquery client-id query-n))
a/merge
(a/into [])
<!)})))
(comment
(go (prn (<! (go-query :a 1)))))
(def client-chans {:a (a/chan)
:b (a/chan)})
(defn client-worker [client-id query-chan]
(go-loop []
(when-some [q (<! query-chan)]
(prn (format "queried id = %s q = %s" client-id (<! (go-query client-id q))))
(recur))))
(def invalidation-chan (a/chan))
(defn invalidation-broadcaster []
(go (loop []
(<! (a/timeout 1500))
(when (>! invalidation-chan (rand-int 10))
(recur)))))
(defn invalidation-worker [chan]
(go-loop []
(when-some [sq-id (<! chan)]
(let [subs (->> sq-id (@recordings))]
(prn (format "invalidating sq-id = %s subs = %s" sq-id subs))
(doseq [[client-id query-n] subs]
(>! (client-id client-chans) query-n))
(recur)))))
(comment
(do (client-worker :a (:a client-chans))
(client-worker :b (:b client-chans))
(invalidation-worker invalidation-chan)
(invalidation-broadcaster))
(a/close! invalidation-chan)
(go (>! (:a client-chans) 1)))带解的
我有点难过,因为record-subquery!是嵌套在go-subquery下的。这使得go-query具有状态。不过,我这样做是为了避免以下竞赛条件:
T0: go-query starts
T1: subquery-1 completes
T2: subquery-1 is invalidated
T3: subquery-2 completes
T4: go-query completes在这个场景中,我们将错过T2更新。
你会不这么做吗?
发布于 2023-05-25 16:00:22
您没有写下任何不变量,问题陈述也不完全清楚。这是我听到的。
我们返回有效的查询结果,由耗时的子查询构建而成.每个子查询在任何时候都可能失效,从而影响到整个结果。
这听起来像是梅西,因为在查询结果即将返回之前,可以随时发送无效消息。当前代码所能做出的最有力的承诺是,在返回之前立即检查它是否无效,并找到一个空队列。
假设通道用时间戳或Lamport时钟标记所有传入消息。当报告零消息时,它返回一个类似的计数器。现在,我们可以通过用计数器注释查询结果来合理地描述查询结果的有效性。可能会有随后的无效消息,但它们将被标记为大于有效结果中的计数器值。
考虑为通道大小指定一个“大”值,以便使无效者不可能阻塞。当前默认的1大小会引起生产者和消费者之间的锁定步长。
考虑执行所有(耗时!)为了简单起见,无条件地进行子查询,然后进行(快速)有效性检查。这样做的目的是通过一个严密的验证循环,一旦它看到通道是空的,就可以退出。
https://codereview.stackexchange.com/questions/285162
复制相似问题