我使用Clojure中的惰性序列和各种无限级数公式(Euler,Leibniz)编写了一个web服务来生成Pi的估计值。Clojure服务通过服务器发送的事件通道发送这些估计值。目前,一个HTML/JS视图正在使用Vue.js来消费和显示SSE事件。
只要SSE通道的连接没有关闭,它就可以作为具有单个节点的服务工作得很好。但到目前为止,如果连接关闭或服务终止,它不会持续或备份减少的状态(无限系列中的位置)来从故障中恢复。此外,由于状态包含在服务的本地内存中(在Clojure序列值中),因此不存在水平可伸缩性,例如,如果长期内存状态存在于Redis中,则存在水平可伸缩性。在这种情况下,仅仅添加新节点并不能提供实际划分工作的方法--它只会复制相同的系列。使用Redis卸载长期内存状态是我习惯于无状态web服务的那种设置,以简化水平扩展和容错策略。
在这种有状态的情况下,我有点困惑于如何使用分布式多节点解决方案扩展Clojure服务,该解决方案可以并行处理系列术语。也许可以有一个分派“主”服务,将序列范围委托给不同的节点,并发地从节点接收结果(通过Redis pub/sub),以数学方式聚合它们,并为视图生成结果SSE流?在这种情况下,主服务将使用间隔约为1000的无限系列的数字来产生范围界限,并行节点可以使用这些范围界限来初始化非无限的Clojure序列(可能仍然是惰性的)?当然,在这种情况下,我需要标记哪些序列范围在进入时是完整的,并在处理某个范围期间出现节点故障的情况下使用重试策略。
我正在研究Kubernetes有状态集,以便熟悉有状态服务的部署模式,尽管我还没有遇到适合这个特定问题的模式或解决方案。如果这是一个无状态的服务,Kubernetes解决方案将是显而易见的,但有状态的方法在Kubernetes环境中给我留下了一张白板。
有人能为我指出一个好的架构方向吗?假设我确实希望将系列术语的状态封装在Clojure惰性序列中(即,在本地服务内存中),那么我划分工作的策略是正确的吗?
下面是单节点Clojure服务的相关代码:
(ns server-sent-events.service
(:require [io.pedestal.http :as http]
[io.pedestal.http.sse :as sse]
[io.pedestal.http.route :as route]
[io.pedestal.http.route.definition :refer [defroutes]]
[ring.util.response :as ring-resp]
[clojure.core.async :as async]
)
)
(defn seq-of-terms
[func]
(map func (iterate (partial + 1) 0))
)
(defn euler-term [n]
(let [current (+ n 1)] (/ 6.0 (* current current)))
)
; The following returns a lazy list representing iterable sums that estimate pi
; according to the Euler series for increasing amounts of terms in the series.
; Sample usage: (take 100 euler-reductions)
(def euler-reductions
(map (fn [sum] (Math/sqrt sum)) (reductions + (seq-of-terms euler-term) ))
)
(defn leibniz-term [n] ; starts at zero
(let [
oddnum (+ (* 2.0 n) 1.0)
signfactor (- 1 (* 2 (mod n 2)))
]
(/ (* 4.0 signfactor) oddnum)
)
)
; The following returns a lazy list representing iterable sums that estimate pi
; according to the Leibniz series for increasing amounts of terms in the series.
; Sample usage: (take 100 leibniz-reductions)
(def leibniz-reductions (reductions + (seq-of-terms leibniz-term)))
(defn send-result
[event-ch count-num rdcts]
(doseq [item rdcts]
(Thread/sleep 150) ; we must use a naive throttle here to prevent an overflow on the core.async CSP channel, event-ch
(async/put! event-ch (str item))
)
)
(defn sse-euler-stream-ready
"Start to send estimates to the client according to the Euler series"
[event-ch ctx]
;; The context is passed into this function.
(let
[
{:keys [request response-channel]} ctx
lazy-list euler-reductions
]
(send-result event-ch 10 lazy-list)
)
)
(defn sse-leibniz-stream-ready
"Start to send estimates to the client according to the Leibniz series"
[event-ch ctx]
(let
[
{:keys [request response-channel]} ctx
lazy-list leibniz-reductions
]
(send-result event-ch 10 lazy-list)
)
)
;; Wire root URL to sse event stream
;; with custom event-id setting
(defroutes routes
[[["/" {:get [::send-result-euler (sse/start-event-stream sse-euler-stream-ready)]}
["/euler" {:get [::send-result
(sse/start-event-stream sse-euler-stream-ready)]}]
["/leibniz" {:get [::send-result-leibniz
(sse/start-event-stream sse-leibniz-stream-ready)]}]
]]])
(def url-for (route/url-for-routes routes))
(def service {:env :prod
::http/routes routes
;; Root for resource interceptor that is available by default.
::http/resource-path "/public"
;; Either :jetty or :tomcat (see comments in project.clj
;; to enable Tomcat)
::http/type :jetty
::http/port 8080
;;::http/allowed-origins ["http://127.0.0.1:8081"]
}
)完整的代码在https://github.com/wclark-aburra-code/pi-service上。包含内联Vue.js代码,它使用SSE流。
发布于 2020-01-04 12:19:00
如果只是为了伸缩,我不认为你需要持久化任何东西。您所需要的只是一个调度“主节点”(可能是客户端本身),它从多个后端请求分块的序列,并对它们进行重新组装,以便以正确的顺序交付。
使用core.async,可以像这样实现调度主机:
(let [batch-ch (async/chan)
out-ch (async/chan)]
;; request for 100 batches (or infinite)
(async/onto-chan batch-ch (range 100))
;; consume the result by pushing it back to the sse channel
(async/go-loop []
(when-let [res (async/<! out-ch)]
(log/info ::result res)
(recur)))
;;
;; take each batch number from batch-ch and dispatch it to the backend
;; in parallel. You would also add an exception handler in here.
;;
(async/pipeline-async
;; parallelism
32
;; output
out-ch
;; invoke backend service, this should return immediately
(fn [batch ch]
(let [batch-sz 1000]
(async/go
(let [start (* batch batch-sz)
end (-> batch inc (* batch-sz))]
(log/info ::fetching-from-service start end)
;; simulate a slow service
(async/<! (async/timeout 1000))
;; push the result back to the pipeline and close the channel
;; (here I just return the term itself)
(async/onto-chan ch (range start end))))))
;; input ;;
batch-ch))https://stackoverflow.com/questions/59583242
复制相似问题