首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >消息得到错误"Dispatcher没有订阅者“

消息得到错误"Dispatcher没有订阅者“
EN

Stack Overflow用户
提问于 2019-07-08 22:49:52
回答 2查看 289关注 0票数 1

我的应用程序成功地发送了Kafka消息,但只有在Kafka被初始化之后。在此之前,我得到的错误是"Dispatcher没有订阅者“。如何等待订阅者完成频道注册?

下面是事件顺序的跟踪(second.ms中的时间安排):

  • 17.165 SenderClass创建
  • 17.816初始化类,@PostConstruct启动PollingTask
  • 24.781 PollingTask发送第一条卡夫卡消息
  • 24.816第一个错误:"Dispatcher没有订阅者“
  • 25.778注册MessageChannel i-通道
  • 仍然看到调度员错误
  • 27.067频道我的频道‘有一个订户
  • 在此之后不再出现错误,消息可以发送。

我不知道该怎么处理。疯狂的猜测包括:

  1. 将发送代码放在@PostConstruct中
  2. 将@AutoConfigureBefore(BindingServiceConfiguration.class)添加到发件人
  3. 将@AutoConfigureAfter(BindingServiceConfiguration.class)添加到SenderClass
  4. 将@AutoConfigureBefore(BindingServiceConfiguration.class)添加到Main
  5. 在任务上放置@DependsOn({"EnableBindingClass"})
  6. Place @DependsOn({" ApplicationLifeCycle "})在SchedulerClass上,其中ApplicationLifeCycle是一个只实现SmartLifecycle的类,getPhase返回MAX_INT
  7. 确保整个包上有ComponentScan (来自其他SO线程的建议)
  8. 上述各种组合

创建了一个新的应用程序,使它尽可能简单:

代码语言:javascript
复制
public interface Source {
  @Output(channelName)
  MessageChannel outboundChannel();
}
代码语言:javascript
复制
@EnableBinding(Source.class)
@Component
public class Sender {
  @Autowired
  private Source source;

  public boolean send(SomeObject object) {
    return source.outboundChannel().send(MessageBuilder.withPayload(object).build());
  }
代码语言:javascript
复制
@Service
public class Scheduler {
  @Autowired
  Sender sender;
  @Autowired
  ThreadPoolTaskScheduler taskScheduler;

  @PostConstruct
  public void initialize() {
    taskScheduler.schedule(new PollingTask(), nextTime);
  }

  private class PollingTask implements Runnable {
    @Override
    public void run() {
      List<SomeObject> objects = getDummyData();
      for(SomeObject object : objects)
      {
        sender.send(interval);
      }

      Instant nextTime = Instant.now().plusMillis(1_000L);
      try {
        taskScheduler.schedule(new PollingTask(), nextTime);
      } catch (Exception e) {
        logger.error(e);
      }
    }
}

编辑添加解决方案

现在起作用了!在启动发送消息的调度程序中,我从@PostConstruct中的启动事项切换到了SmartLifecycle::start()。

代码语言:javascript
复制
@Service
public class Scheduler implements SmartLifecycle {
  @Autowired
  Sender sender;
  @Autowired
  ThreadPoolTaskScheduler taskScheduler;

  @Override
  public void start() {
    taskScheduler.schedule(new PollingTask(), nextTime);
  }

  private class PollingTask implements Runnable {
    @Override
    public void run() {
      List<SomeObject> objects = getDummyData();
      for(SomeObject object : objects)
      {
        sender.send(interval);
      }

      Instant nextTime = Instant.now().plusMillis(1_000L);
      try {
        taskScheduler.schedule(new PollingTask(), nextTime);
      } catch (Exception e) {
        logger.error(e);
      }
    }
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-07-08 23:14:05

@PostConstruct发送消息为时尚早;上下文仍在构建中。Implememt,将bean放到一个高阶段(Integer.MAX_VALUE),并在start()中执行发送。

或者发送一个ApplicationRunner。

票数 1
EN

Stack Overflow用户

发布于 2022-03-30 06:41:44

我在Webflux +中遇到了类似的问题。2022年的春云函数是首选的方法。​

经过大量调试,我的假设是bean没有按正确的顺序创建。在卡夫卡消息处理开始之前,这个bean可能还没有在春云流的分配器中注册。“跟加里提到的差不多。”

因此,我在我的消费豆类之前添加了@Order(1)。希望在调度程序注册开始之前就创建这个bean。

代码语言:javascript
复制
   ​@Bean
   ​@Order(1)
   ​public Function<Flux<Message<Pojo>>, Mono<Void>> pojoConsumer() {

这似乎解决了我现在的问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56943397

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档