core.async新手在这里,但试图通过构建一个自动重新连接的websocket来学习。其思想是将套接字抽象出来,这样使用它的任何人都不需要担心它是否连接,只需将消息放到通道上即可。如果套接字已连接,则会立即从通道读取并发送消息。如果套接字当前未连接,它将反复尝试重新连接,并在连接建立后等待发送所有挂起的消息。
这就是我到目前为止所知道的:
(ns frontend.socket
(:require-macros [cljs.core.async.macros :as asyncm :refer [go]])
(:require [cljs.core.async :as async :refer [<! >! chan timeout]]))
(def endpoint "ws://localhost:8080")
(def socket (atom nil))
(def open (chan))
(def in (chan))
(def out (chan))
(def error (chan))
(def close (chan))
(defn open? []
(= (.-readyState @socket) (.-OPEN @socket)))
(defn event>chan
"Given a core.async channel, returns a
function which takes an event and feeds
it into the formerly given channel."
[channel]
(fn [e]
(go (>! channel e))))
(defn connect
"Opens a websocket, stores it in the socket
atom, and feeds the socket's events into
the corresponding channels."
[]
(let [s (js/WebSocket. endpoint)]
(reset! socket s)
(set! s.onopen (event>chan open))
(set! s.onmessage (event>chan in))
(set! s.onerror (event>chan error))
(set! s.onclose (event>chan close))))
(defn init
"This is the entry point from outside this
namespace. Eagerly connects and then, if
ever disconnected, attempts to reconnect
every second. Also takes messages from
the out channel and sends them through
the socket."
[]
(go
(while true
(connect)
(<! close)
(<! (timeout 1000))))
(go
(while true
(let [m (<! out)]
(when (or (open?) (<! open))
(.send @socket m))))))重新连接逻辑似乎工作得很好,但我在套接字关闭后尝试发送消息时遇到了问题。在最后几行中,我在发送消息之前检查套接字是否打开或等待它打开。问题是,如果在套接字之前打开、然后关闭、然后发送消息时没有发送任何消息,则仍有一些东西驻留在打开的通道上,因此无论如何,消息都会被发送。因此,开放的通道可能会变得“陈旧”。
每当套接字关闭时,我都想从打开的通道中获取,但这正好颠倒了问题:当我确实需要这些值时,打开的通道可能会遗漏这些值,并且在重新连接时不会发送消息。
我在这里做错了什么?
发布于 2019-12-24 00:13:03
这是错误的方法,因为通道将等待被消耗。我需要使用发布/订阅:https://github.com/clojure/core.async/wiki/Pub-Sub/549da1843c7bbe6009e9904eed49f91000d8ce2c
https://stackoverflow.com/questions/59347170
复制相似问题