我在我的go应用程序中有一个redis公共连接,所以每当客户端连接和订阅一个频道时,它都会监听并发送消息。但是,假设客户端1订阅了通道X,公共子就会开始监听和接收来自它的消息。
现在,客户端1也订阅了通道Y,因此服务器也应该侦听来自该通道的消息,但是它不再监听X,而只监听Y。
for {
switch v := gPubSubConn.Receive().(type) {
case redis.Message:
log.Printf("Received message from %s", v.Channel)
subscriptions := ps.GetSubscriptions(v.Channel, nil)
for _, sub := range subscriptions {
if v.Channel == types.TaskResults {
go sendTaskResultMessage(v.Data, sub)
} else if v.Channel == types.TaskCount {
go sendTaskCountMessage(v.Data, sub)
}
}
case redis.Subscription:
log.Printf("Subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Println("Error pub/sub, delivery stopped")
return
}下面是一个日志输出示例
go-1 | New Client is connected, total: 1
go-1 | 2022/02/16 17:36:03 signature is invalid
go-1 | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | New Client is connected, total: 2
go-1 | 2022/02/16 17:36:14 signature is invalid
go-1 | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1
go-1 | 2022/02/16 17:36:16 Received message from task_count
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results有什么好主意吗?
按评论编辑:
type PubSub struct {
Clients []Client
Subscriptions []Subscription
}
type Client struct {
Id string
Connection *websocket.Conn
}
type Message struct {
Action string `json:"action"`
Topic string `json:"topic"`
Message json.RawMessage `json:"message"`
Token string `json:"token"`
}
type Subscription struct {
Topic string
Client *Client
UserId string
}
func (ps *PubSub) GetSubscriptions(topic string, client *Client) []Subscription {
var subscriptionList []Subscription
for _, subscription := range ps.Subscriptions {
if client != nil {
if subscription.Client.Id == client.Id && subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
} else {
if subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
}
}
return subscriptionList
}这是我的websocket处理程序
func websocketHandler(w http.ResponseWriter, r *http.Request) {
gRedisConn, err := gRedisConn()
if err != nil {
log.Panic(err)
}
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := pubsub.Client{
Id: autoId(),
Connection: conn,
}
// add this client into the list
ps.AddClient(client)
fmt.Println("New Client is connected, total: ", len(ps.Clients))
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.Println("Something went wrong", err)
ps.RemoveClient(client)
log.Println("total clients and subscriptions ", len(ps.Clients), len(ps.Subscriptions))
return
}
go listenToMessages()
ps.HandleReceiveMessage(client, messageType, p, gPubSubConn)
}
}
func (ps *PubSub) HandleReceiveMessage(client Client, messageType int, payload []byte, gPubSubConn *redis.PubSubConn) *PubSub {
m := Message{}
err := json.Unmarshal(payload, &m)
if err != nil {
fmt.Println("This is not correct message payload")
return ps
}
switch m.Action {
case PUBLISH:
ps.Publish(m.Topic, m.Message, nil)
case SUBSCRIBE:
ps.Subscribe(&client, m.Topic, gPubSubConn, m.Token)
case UNSUBSCRIBE:
fmt.Println("Client want to unsubscribe the topic", m.Topic, client.Id)
default:
break
}
return ps
}
func (ps *PubSub) Subscribe(client *Client, topic string, gPubSubConn *redis.PubSubConn, token string) *PubSub {
clientSubs := ps.GetSubscriptions(topic, client)
if len(clientSubs) > 0 {
return ps
}
userId := utils.GetUser(token)
newSubscription := Subscription{
Topic: topic,
Client: client,
UserId: userId,
}
ps.Subscriptions = append(ps.Subscriptions, newSubscription)
if err := gPubSubConn.Subscribe(topic); err != nil {
log.Panic(err)
}
return ps
}发布于 2022-02-16 19:42:36
直接问题是由websocketHandler中的这一行引起的。
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}此行用新连接替换当前的pubsub连接。新连接没有任何订阅。之前的连接被泄露了。
在应用程序启动时创建一次pubsub连接。
应用程序至少有一个数据竞赛。使用种族检测器运行应用程序并修复报告的问题。
https://stackoverflow.com/questions/71146678
复制相似问题