
Redis List:打造高效消息队列的秘密武器【redis实战 一】
在快节奏的技术世界中,消息队列是连接不同服务和组件的关键。而在这个领域,Redis Streams作为一种新兴的消息队列解决方案,以其高性能和易用性吸引了众多开发者的目光。当这项技术遇到了Spring Boot —— 当今最受欢迎的Java开发框架,它们的结合将如何开启新的可能性?让我们开始这趟探索之旅,深入了解如何将这两种强大的技术融合在一起,打造出优雅而强大的消息队列系统。
Redis Streams是Redis数据库的一个强大类型,于Redis 5.0中引入。它主要用于消息队列和事件流的存储与传递,是一个高性能、持久化的日志数据结构。以下是Redis Streams的一些基本概念和核心特性:
<时间戳>-<序列号>。XREAD或XREADGROUP命令,你可以实时监听并处理新添加到流中的消息。XRANGE、XREVRANGE等命令,可以查询流中的历史消息,这对于数据分析、审计和消息重放非常有用。<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>package fun.bo.config;
import fun.bo.consumer.MessageConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import java.time.Duration;
/**
* @author xiaobo
*/
@Configuration
public class StreamConfig {
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(
RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {
// 用于配置消息监听容器的选项。在这个方法中,通过设置不同的选项,如轮询超时时间和消息的目标类型,可以对消息监听容器进行个性化的配置。
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 设置了轮询超时的时间为100毫秒。这意味着当没有新的消息时,容器将每隔100毫秒进行一次轮询。
.pollTimeout(Duration.ofMillis(100))
// 指定了消息的目标类型为 String。这意味着容器会将接收到的消息转换为 String 类型,以便在后续的处理中使用。
.targetType(String.class)
.build();
// 创建一个可用于监听Redis流的消息监听容器。
StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer =
StreamMessageListenerContainer.create(connectionFactory, options);
// 方法配置了容器来接收来自特定消费者组和消费者名称的消息。它还指定了要读取消息的起始偏移量,以确定从哪里开始读取消息。
listenerContainer.receive(
Consumer.from("your-consumer-group", "your-consumer-name"),
StreamOffset.create("your-stream-name", ReadOffset.lastConsumed()), messageConsumer);
// 方法启动了消息监听容器,使其开始监听消息。一旦容器被启动,它将开始接收并处理来自Redis流的消息。
listenerContainer.start();
return listenerContainer;
}
}package fun.bo.produce;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
* @author xiaobo
*/
@Service
@RequiredArgsConstructor
public class MessageProducer {
private final RedisTemplate<String, String> redisTemplate;
public void sendMessage(String streamKey, String messageKey, String message) {
Map<String, String> messageMap = new HashMap<>();
messageMap.put(messageKey, message);
RecordId recordId = redisTemplate.opsForStream().add(streamKey, messageMap);
if (recordId != null) {
System.out.println("Message sent to Stream '" + streamKey + "' with RecordId: " + recordId);
}
}
}package fun.bo.consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Service;
/**
* @author xiaobo
*/
@Service
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {
@Override
public void onMessage(ObjectRecord<String, String> message) {
String stream = message.getStream();
String messageId = message.getId().toString();
String messageBody = message.getValue();
System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);
System.out.println("Message body: " + messageBody);
}
}如果是已经存在stream,则可以不配置,这个主要是为了防止启动报错,org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key ‘your-stream-name’ or consumer group ‘your-consumer-group’ in XREADGROUP with GROUP option
public void initializeStream() {
StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
// 创建一个流
try {
streamOperations.createGroup("your-stream-name", ReadOffset.from("0"), "your-consumer-group");
} catch (Exception e) {
// 流可能已存在,忽略异常
}
}
Redis Streams作为消息队列相比于使用传统的Redis List类型,引入了一系列改进和新功能,同时也与专业的高级消息队列系统(如RabbitMQ、Kafka等)相比存在一些差距。以下是详细的分析:
- **List**:虽然List可以保持插入顺序,但在高并发情况下,确保生产者和消费者的顺序一致性较为复杂。
- **Streams**:提供了全局唯一的、基于时间的ID来标识消息,确保了消息的全局顺序。- **List**:原生List类型不支持消费者组的概念,实现多消费者协调处理同一任务队列较为复杂。
- **Streams**:原生支持消费者组,允许多个消费者共享负载,并跟踪各自的进度。- **List**:读取或消费消息后,需要显式删除,否则会一直保留在List中,处理大量消息时可能会导致内存问题。
- **Streams**:消息即使被消费,仍然保留在Stream中,可以随时查询历史消息,且不会因消费而被移除。- **List**:List提供的操作相对简单,复杂的读取逻辑(如按时间范围查询)需要额外的逻辑来实现。
- **Streams**:提供了复杂的查询命令,如`XRANGE`、`XREVRANGE`,可以按ID范围(时间范围)查询消息。- **List**:需要手动实现消息确认和重试机制,管理起来较为复杂。
- **Streams**:提供了消息确认(`XACK`)和挂起消息查询(`XPENDING`)的功能,使得消息的重试和故障处理更加容易。- **Redis Streams**:虽然提供持久化,但在处理复杂事务和确保消息持久性方面不如一些专业的消息队列系统(如Kafka的WAL日志)。- **Redis Streams**:在集群环境下使用稍显复杂,且对于数据分区和扩展性的支持不如专业的消息队列系统(如Kafka的分区机制)。- **Redis Streams**:虽然有基本的监控命令,但没有一些高级消息队列系统提供的丰富的管理界面和监控工具。- **Redis Streams**:缺乏一些高级消息队列提供的消息路由和过滤功能(如RabbitMQ的Exchange和Binding)。- **Redis Streams**:提供了基础的至少一次处理语义,但可能不像某些系统那样支持严格的只处理一次语义。Redis Streams提供了一个轻量级、高性能且功能丰富的消息队列实现,解决了使用List作为队列时的许多痛点,特别适合需要快速部署、低延迟和简单可靠的场景。然而,对于需要复杂事务处理、高级路由和过滤、或更丰富管理工具的复杂应用场景,专业的消息队列系统可能更加适合。选择哪种方案,应根据你的具体需求、资源和技术栈来决定。