首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Redisson异步处理消息

Redisson异步处理消息
EN

Stack Overflow用户
提问于 2021-03-12 10:40:07
回答 1查看 603关注 0票数 1

作为message,我试图为我的项目提供Redisson特性,我有一个问题。是否有可能将Redisson推送到异步接收的接收消息?我创建了一个小示例,从不同的URL发送了4条消息。我希望Redisson异步地处理它们,但是它一个接一个地完成了。在此,执行:

代码语言:javascript
复制
public class RedisListenerServiceImpl implements MessageListener<String> {

    private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void onMessage(CharSequence channel, String stringMsg) {

        log.info("Message received: {}", stringMsg);
        MessageDto msg;
        try {
            msg = objectMapper.readValue(stringMsg, MessageDto.class);
        } catch (final IOException e) {
            log.error("Unable to deserialize message: {}", e.getMessage(), e);
            return;
        }

        try {
            //Do my stuff
        } catch (Exception e) {
            log.error("Unable to get service from factory: {}", e.getMessage(), e);
        }

    }
}

以及配置:

代码语言:javascript
复制
@Configuration
public class RedisListenerConfig {

    @Autowired
    public RedisListenerConfig(RedissonClient redisClient,
                               MessageListener redisListenerService,
                               @Value("${redis.sub.key}") String redisSubKey) {

        RTopic subscribeTopic = redisClient.getTopic(redisSubKey);
        subscribeTopic.addListenerAsync(String.class, redisListenerService);
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-13 01:47:24

这是预期的行为。如果希望在触发侦听器onMessage()方法时并发处理消息,只需使用线程池来处理。

因为Redisson不知道要使用多少个线程来使用触发的事件,所以它将实现细节留给您。

代码语言:javascript
复制
public class RedisListenerServiceImpl implements MessageListener<String> {

private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);

@Override
public void onMessage(CharSequence channel, String stringMsg) {

    log.info("Message received: {}", stringMsg);
    MessageDto msg;
    try {
        msg = objectMapper.readValue(stringMsg, MessageDto.class);
        executorService.submit(()->{
        System.out.println("do something with message: "+msg);
    });
    } catch (final IOException e) {
        log.error("Unable to deserialize message: {}", e.getMessage(), e);
        return;
    }

    try {
        //Do my stuff
    } catch (Exception e) {
        log.error("Unable to get service from factory: {}", e.getMessage(), e);
    }

}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66598346

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档