首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用惰性序列和SSE的有状态服务--如何分配容错?

使用惰性序列和SSE的有状态服务--如何分配容错?
EN

Stack Overflow用户
提问于 2020-01-04 02:01:39
回答 1查看 229关注 0票数 0

我使用Clojure中的惰性序列和各种无限级数公式(Euler,Leibniz)编写了一个web服务来生成Pi的估计值。Clojure服务通过服务器发送的事件通道发送这些估计值。目前,一个HTML/JS视图正在使用Vue.js来消费和显示SSE事件。

只要SSE通道的连接没有关闭,它就可以作为具有单个节点的服务工作得很好。但到目前为止,如果连接关闭或服务终止,它不会持续或备份减少的状态(无限系列中的位置)来从故障中恢复。此外,由于状态包含在服务的本地内存中(在Clojure序列值中),因此不存在水平可伸缩性,例如,如果长期内存状态存在于Redis中,则存在水平可伸缩性。在这种情况下,仅仅添加新节点并不能提供实际划分工作的方法--它只会复制相同的系列。使用Redis卸载长期内存状态是我习惯于无状态web服务的那种设置,以简化水平扩展和容错策略。

在这种有状态的情况下,我有点困惑于如何使用分布式多节点解决方案扩展Clojure服务,该解决方案可以并行处理系列术语。也许可以有一个分派“主”服务,将序列范围委托给不同的节点,并发地从节点接收结果(通过Redis pub/sub),以数学方式聚合它们,并为视图生成结果SSE流?在这种情况下,主服务将使用间隔约为1000的无限系列的数字来产生范围界限,并行节点可以使用这些范围界限来初始化非无限的Clojure序列(可能仍然是惰性的)?当然,在这种情况下,我需要标记哪些序列范围在进入时是完整的,并在处理某个范围期间出现节点故障的情况下使用重试策略。

我正在研究Kubernetes有状态集,以便熟悉有状态服务的部署模式,尽管我还没有遇到适合这个特定问题的模式或解决方案。如果这是一个无状态的服务,Kubernetes解决方案将是显而易见的,但有状态的方法在Kubernetes环境中给我留下了一张白板。

有人能为我指出一个好的架构方向吗?假设我确实希望将系列术语的状态封装在Clojure惰性序列中(即,在本地服务内存中),那么我划分工作的策略是正确的吗?

下面是单节点Clojure服务的相关代码:

代码语言:javascript
复制
(ns server-sent-events.service
  (:require [io.pedestal.http :as http]
            [io.pedestal.http.sse :as sse]
            [io.pedestal.http.route :as route]
            [io.pedestal.http.route.definition :refer [defroutes]]
            [ring.util.response :as ring-resp]
            [clojure.core.async :as async]
  )
)

(defn seq-of-terms
   [func]
   (map func (iterate (partial + 1) 0))
)

(defn euler-term [n]
  (let [current (+ n 1)] (/ 6.0 (* current current)))
)

; The following returns a lazy list representing iterable sums that estimate pi
; according to the Euler series for increasing amounts of terms in the series.
; Sample usage: (take 100 euler-reductions)
(def euler-reductions
  (map (fn [sum] (Math/sqrt sum))  (reductions + (seq-of-terms euler-term) ))
)

(defn leibniz-term [n] ; starts at zero
   (let [
          oddnum (+ (* 2.0 n) 1.0)
          signfactor (- 1 (* 2 (mod n 2)))
        ]
        (/ (* 4.0 signfactor) oddnum)
  )
)

; The following returns a lazy list representing iterable sums that estimate pi
; according to the Leibniz series for increasing amounts of terms in the series.
; Sample usage: (take 100 leibniz-reductions)
(def leibniz-reductions (reductions + (seq-of-terms leibniz-term)))

(defn send-result
  [event-ch count-num rdcts]
  (doseq [item rdcts]
    (Thread/sleep 150) ; we must use a naive throttle here to prevent an overflow on the core.async CSP channel, event-ch
    (async/put! event-ch (str item))
  )
)

(defn sse-euler-stream-ready
  "Start to send estimates to the client according to the Euler series"
  [event-ch ctx]
  ;; The context is passed into this function.
  (let
    [
      {:keys [request response-channel]} ctx
      lazy-list euler-reductions
    ]
    (send-result event-ch 10 lazy-list)
  )
)

(defn sse-leibniz-stream-ready
  "Start to send estimates to the client according to the Leibniz series"
  [event-ch ctx]
  (let
    [
      {:keys [request response-channel]} ctx
      lazy-list leibniz-reductions
    ]
    (send-result event-ch 10 lazy-list)
  )
)


;; Wire root URL to sse event stream
;; with custom event-id setting
(defroutes routes
  [[["/" {:get [::send-result-euler (sse/start-event-stream sse-euler-stream-ready)]}
    ["/euler" {:get [::send-result
                    (sse/start-event-stream sse-euler-stream-ready)]}]
    ["/leibniz" {:get [::send-result-leibniz
                      (sse/start-event-stream sse-leibniz-stream-ready)]}]
    ]]])

(def url-for (route/url-for-routes routes))

(def service {:env :prod
              ::http/routes routes
              ;; Root for resource interceptor that is available by default.
              ::http/resource-path "/public"
              ;; Either :jetty or :tomcat (see comments in project.clj
              ;; to enable Tomcat)
              ::http/type :jetty
              ::http/port 8080
              ;;::http/allowed-origins ["http://127.0.0.1:8081"]
              }
)

完整的代码在https://github.com/wclark-aburra-code/pi-service上。包含内联Vue.js代码,它使用SSE流。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-01-04 12:19:00

如果只是为了伸缩,我不认为你需要持久化任何东西。您所需要的只是一个调度“主节点”(可能是客户端本身),它从多个后端请求分块的序列,并对它们进行重新组装,以便以正确的顺序交付。

使用core.async,可以像这样实现调度主机:

代码语言:javascript
复制
(let [batch-ch (async/chan)
      out-ch   (async/chan)]

  ;; request for 100 batches (or infinite)
  (async/onto-chan batch-ch (range 100))
  ;; consume the result by pushing it back to the sse channel
  (async/go-loop []
    (when-let [res (async/<! out-ch)]
      (log/info ::result res)
      (recur)))

  ;;
  ;; take each batch number from batch-ch and dispatch it to the backend
  ;; in parallel. You would also add an exception handler in here.
  ;;
  (async/pipeline-async
   ;; parallelism
   32
   ;; output
   out-ch
   ;; invoke backend service, this should return immediately
   (fn [batch ch]
     (let [batch-sz 1000]
       (async/go
         (let [start (* batch batch-sz)
               end   (-> batch inc (* batch-sz))]
           (log/info ::fetching-from-service start end)
           ;; simulate a slow service
           (async/<! (async/timeout 1000))
           ;; push the result back to the pipeline and close the channel
           ;; (here I just return the term itself)
           (async/onto-chan ch (range start end))))))
   ;; input  ;;
   batch-ch))
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59583242

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档