在我们当前的应用程序中,我们使用Spring Websockets而不是STOMP。我们正在寻求横向扩展。关于我们应该如何处理多个tomcat实例上的websocket流量,以及如何维护多个nodes.Is上的会话信息,有没有什么最佳实践可以参考?
发布于 2015-07-22 12:35:03
您的需求可以分为两个子任务:
- The first way: Using a full-featured broker (eg: ActiveMQ) and try new feature [Support multiple WebSocket servers](https://jira.spring.io/browse/SPR-11620) (from: 4.2.0 RC1)
- The second way: Using a full-feature broker and implement a distributed `UserSessionRegistry` (eg: Using Redis :D ). The default implementation `DefaultUserSessionRegistry` using an in-memory storage.
更新:我已经用Redis写了一个简单的实现,如果你对感兴趣,可以试试。
要配置功能齐全的代理(代理中继),您可以尝试:
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
...
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost") // broker host
.setRelayPort(61613) // broker port
;
config.setApplicationDestinationPrefixes("/app");
}
@Bean
public UserSessionRegistry userSessionRegistry() {
return new RedisUserSessionRegistry(redisConnectionFactory);
}
...
}和
import java.util.Set;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundHashOperations;
import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.messaging.simp.user.UserSessionRegistry;
import org.springframework.util.Assert;
/**
* An implementation of {@link UserSessionRegistry} backed by Redis.
* @author thanh
*/
public class RedisUserSessionRegistry implements UserSessionRegistry {
/**
* The prefix for each key of the Redis Set representing a user's sessions. The suffix is the unique user id.
*/
static final String BOUNDED_HASH_KEY_PREFIX = "spring:websockets:users:";
private final RedisOperations<String, String> sessionRedisOperations;
@SuppressWarnings("unchecked")
public RedisUserSessionRegistry(RedisConnectionFactory redisConnectionFactory) {
this(createDefaultTemplate(redisConnectionFactory));
}
public RedisUserSessionRegistry(RedisOperations<String, String> sessionRedisOperations) {
Assert.notNull(sessionRedisOperations, "sessionRedisOperations cannot be null");
this.sessionRedisOperations = sessionRedisOperations;
}
@Override
public Set<String> getSessionIds(String user) {
Set<String> entries = getSessionBoundHashOperations(user).members();
return (entries != null) ? entries : Collections.<String>emptySet();
}
@Override
public void registerSessionId(String user, String sessionId) {
getSessionBoundHashOperations(user).add(sessionId);
}
@Override
public void unregisterSessionId(String user, String sessionId) {
getSessionBoundHashOperations(user).remove(sessionId);
}
/**
* Gets the {@link BoundHashOperations} to operate on a username
*/
private BoundSetOperations<String, String> getSessionBoundHashOperations(String username) {
String key = getKey(username);
return this.sessionRedisOperations.boundSetOps(key);
}
/**
* Gets the Hash key for this user by prefixing it appropriately.
*/
static String getKey(String username) {
return BOUNDED_HASH_KEY_PREFIX + username;
}
@SuppressWarnings("rawtypes")
private static RedisTemplate createDefaultTemplate(RedisConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "connectionFactory cannot be null");
StringRedisTemplate template = new StringRedisTemplate(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
}发布于 2017-04-03 06:09:58
水平伸缩WebSockets实际上与水平伸缩基于无状态/有状态HTTP的应用程序非常不同。
水平伸缩无状态HTTP应用程序:只需在不同的机器上启动一些应用程序实例,并在它们前面放置一个负载均衡器。有相当多不同的负载均衡器解决方案,如HAProxy,Nginx等。如果你在云环境中,如亚马逊网络服务,你也可以有管理的解决方案,如弹性负载均衡器。
水平扩展有状态HTTP应用程序:如果我们可以让所有应用程序每次都是无状态的,那就太好了,但不幸的是,这并不总是可能的。因此,在处理有状态的HTTP应用程序时,您必须关注HTTP会话,它基本上是每个不同客户端的本地存储,web服务器可以在其中存储跨不同HTTP请求保存的数据(例如处理购物车时)。那么,在这种情况下,当水平扩展时,您应该知道,正如我所说的,它是一个本地存储,因此ServerA将无法处理ServerB上的HTTP会话。换句话说,如果由于任何原因,由ServerA提供服务的Client1突然开始由ServerB提供服务,他的HTTP会话将丢失(并且他的购物车也将消失!)。原因可能是节点故障,甚至是部署。为了解决这个问题,您不能只在本地保留HTTP会话,也就是说,您必须将它们存储在另一个外部组件上。有几个组件能够处理这一点,例如任何关系数据库,但这实际上是一种开销。一些NoSQL数据库可以很好地处理这种键值行为,比如Redis。现在,随着HTTP会话存储在Redis上,如果一个客户端开始由另一个服务器提供服务,它将从Redis获取客户端的HTTP会话并将其加载到其内存中,因此一切都将继续工作,用户将不再丢失其HTTP会话。您可以使用Spring Session轻松地将HTTP会话存储在Redis上。
横向扩展WebSocket应用程序:当建立WebSocket连接时,服务器必须保持与客户端的连接打开,以便它们可以双向交换数据。当客户端正在侦听诸如“/topic/Publ.messages”之类的目的地时,我们说客户端订阅了该目的地。在Spring中,当您使用simpleBroker方法时,订阅将保存在内存中,那么如果Client1由ServerA提供服务,并且希望使用WebSocket向由ServerB提供服务的Client2发送消息,会发生什么情况呢?你已经知道答案了!该消息将不会被传递到Client2,因为Server1甚至不知道客户端2的订阅。因此,为了解决这个问题,您必须再次外部化WebSockets订阅。当您使用STOMP作为子协议时,您需要一个可以充当外部STOMP代理的外部组件。有相当多的工具可以做到这一点,但我建议使用RabbitMQ。现在,您必须更改Spring配置,使其不会将订阅保存在内存中。相反,它将把订阅委托给外部STOMP代理。您可以通过一些基本配置轻松实现这一点,例如enableStompBrokerRelay。需要注意的重要一点是,HTTP会话与WebSocket会话不同。使用Spring Session在Redis中存储HTTP session与水平扩展WebSockets完全没有任何关系。
我已经用Spring Boot (以及更多)编写了一个完整的网络聊天应用程序,它使用RabbitMQ作为一个完整的外部STOMP代理,它是public on GitHub,所以请克隆它,在您的机器上运行该应用程序并查看代码细节。
当涉及到WebSocket连接丢失时,Spring所能做的并不多。实际上,重新连接必须由实现重新连接回调函数的客户端请求,例如(这是WebSocket握手流程,客户端必须启动握手,而不是服务器)。有一些客户端库可以透明地为您处理此问题。那不是SockJS的案子。在聊天应用程序中,我也实现了这个重新连接功能。
发布于 2016-12-23 15:23:35
维护多个节点的会话信息:
假设我们有2台服务器主机,用负载均衡器备份。
Websockets是从浏览器到特定服务器的套接字连接host.eg host1
现在,如果host1关闭,来自负载均衡器主机1的套接字连接将中断。spring将如何重新打开从负载均衡器到主机2的相同websocket连接?浏览器不应打开新的websocket连接
https://stackoverflow.com/questions/26853745
复制相似问题