我的应用程序成功地发送了Kafka消息,但只有在Kafka被初始化之后。在此之前,我得到的错误是"Dispatcher没有订阅者“。如何等待订阅者完成频道注册?
下面是事件顺序的跟踪(second.ms中的时间安排):
我不知道该怎么处理。疯狂的猜测包括:
创建了一个新的应用程序,使它尽可能简单:
public interface Source {
@Output(channelName)
MessageChannel outboundChannel();
}@EnableBinding(Source.class)
@Component
public class Sender {
@Autowired
private Source source;
public boolean send(SomeObject object) {
return source.outboundChannel().send(MessageBuilder.withPayload(object).build());
}@Service
public class Scheduler {
@Autowired
Sender sender;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@PostConstruct
public void initialize() {
taskScheduler.schedule(new PollingTask(), nextTime);
}
private class PollingTask implements Runnable {
@Override
public void run() {
List<SomeObject> objects = getDummyData();
for(SomeObject object : objects)
{
sender.send(interval);
}
Instant nextTime = Instant.now().plusMillis(1_000L);
try {
taskScheduler.schedule(new PollingTask(), nextTime);
} catch (Exception e) {
logger.error(e);
}
}
}编辑添加解决方案
现在起作用了!在启动发送消息的调度程序中,我从@PostConstruct中的启动事项切换到了SmartLifecycle::start()。
@Service
public class Scheduler implements SmartLifecycle {
@Autowired
Sender sender;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@Override
public void start() {
taskScheduler.schedule(new PollingTask(), nextTime);
}
private class PollingTask implements Runnable {
@Override
public void run() {
List<SomeObject> objects = getDummyData();
for(SomeObject object : objects)
{
sender.send(interval);
}
Instant nextTime = Instant.now().plusMillis(1_000L);
try {
taskScheduler.schedule(new PollingTask(), nextTime);
} catch (Exception e) {
logger.error(e);
}
}
}发布于 2019-07-08 23:14:05
@PostConstruct发送消息为时尚早;上下文仍在构建中。Implememt,将bean放到一个高阶段(Integer.MAX_VALUE),并在start()中执行发送。
或者发送一个ApplicationRunner。
发布于 2022-03-30 06:41:44
我在Webflux +中遇到了类似的问题。2022年的春云函数是首选的方法。
经过大量调试,我的假设是bean没有按正确的顺序创建。在卡夫卡消息处理开始之前,这个bean可能还没有在春云流的分配器中注册。“跟加里提到的差不多。”
因此,我在我的消费豆类之前添加了@Order(1)。希望在调度程序注册开始之前就创建这个bean。
@Bean
@Order(1)
public Function<Flux<Message<Pojo>>, Mono<Void>> pojoConsumer() {这似乎解决了我现在的问题。
https://stackoverflow.com/questions/56943397
复制相似问题