作为message,我试图为我的项目提供Redisson特性,我有一个问题。是否有可能将Redisson推送到异步接收的接收消息?我创建了一个小示例,从不同的URL发送了4条消息。我希望Redisson异步地处理它们,但是它一个接一个地完成了。在此,执行:
public class RedisListenerServiceImpl implements MessageListener<String> {
private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void onMessage(CharSequence channel, String stringMsg) {
log.info("Message received: {}", stringMsg);
MessageDto msg;
try {
msg = objectMapper.readValue(stringMsg, MessageDto.class);
} catch (final IOException e) {
log.error("Unable to deserialize message: {}", e.getMessage(), e);
return;
}
try {
//Do my stuff
} catch (Exception e) {
log.error("Unable to get service from factory: {}", e.getMessage(), e);
}
}
}以及配置:
@Configuration
public class RedisListenerConfig {
@Autowired
public RedisListenerConfig(RedissonClient redisClient,
MessageListener redisListenerService,
@Value("${redis.sub.key}") String redisSubKey) {
RTopic subscribeTopic = redisClient.getTopic(redisSubKey);
subscribeTopic.addListenerAsync(String.class, redisListenerService);
}
}发布于 2021-03-13 01:47:24
这是预期的行为。如果希望在触发侦听器onMessage()方法时并发处理消息,只需使用线程池来处理。
因为Redisson不知道要使用多少个线程来使用触发的事件,所以它将实现细节留给您。
public class RedisListenerServiceImpl implements MessageListener<String> {
private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override
public void onMessage(CharSequence channel, String stringMsg) {
log.info("Message received: {}", stringMsg);
MessageDto msg;
try {
msg = objectMapper.readValue(stringMsg, MessageDto.class);
executorService.submit(()->{
System.out.println("do something with message: "+msg);
});
} catch (final IOException e) {
log.error("Unable to deserialize message: {}", e.getMessage(), e);
return;
}
try {
//Do my stuff
} catch (Exception e) {
log.error("Unable to get service from factory: {}", e.getMessage(), e);
}
}https://stackoverflow.com/questions/66598346
复制相似问题