我有一个项目,它包含Actuator和Kafka绑定器。我正在探索的bindings/执行器,并试图阻止一个生产者作为演习。我通过curl发出以下邮件请求:
curl -v 'localhost:8081/actuator/bindings/producer-out-0' -H 'content-type: application/json' -d '{"state": "STOPPED"}'
实际结果:查询返回204。生产者的状态(从GET /致动器/绑定/生产者-out-0中看到)现在是stopped。但是,生产者仍然在产生消息,这可以从有关这个主题的日志记录和使用者活动中看到。
预期结果:我希望制片人停止产生信息。(我也尝试使用暂停状态,该状态也返回204,但错误日志表明不能暂停该生产者。)
我是否误解了这个执行器的工作原理?当制片人被停止时,是否预期S.C.S.会继续对该制片人进行投票?我唯一知道的文档是这里,但据我所知,它并没有回答我的问题。
背景
我使用的是弹簧引导-启动-启动-父2.5.3,并有启动-web和启动-执行器列出的依赖关系。我想我什么都没错过。
这是生产者/消费者对。如你所见,我正在使用一个可查询的供应商。
@Configuration
@Profile("numbers")
public class NumberHandlers {
private static final Logger LOGGER = LoggerFactory.getLogger(NumberHandlers.class);
@Bean
public Supplier<Integer> producer() {
// Needed an effectively-final mutable integer. Side-bar comments welcome. :P
var counter = new AtomicInteger();
return () -> {
var n = counter.getAndIncrement();
LOGGER.info("Producing number: " + n);
return n;
};
}
@Bean
public Consumer<Integer> consumer() {
return it -> LOGGER.info("Consuming number: " + it);
}
}当我传入numbers配置文件时,这些都是活动的。我的配置在下面。
application.yml:
server:
port: 8081
spring:
cloud:
stream:
kafka:
binder:
brokers: ${env.kafka.bootstrapservers:localhost}
management:
endpoints:
web:
exposure:
include: 'bindings'..。和应用程序-编号:
spring:
cloud:
stream:
poller:
fixedDelay: 5000
bindings:
producer-out-0:
destination: numbers-raw
producer:
partitionCount: 3
consumer-in-0:
destination: numbers-raw
kafka:
bindings:
producer-out-0:
producer:
topic.properties:
# These look weird because they're done as an exercise.
retention.bytes: 10000
retention.ms: 172800000
function:
definition: producer;consumer我正在一个本地主机环境中进行测试,使用的是在主机网络上编写卡夫卡和动物园管理员的对接器。
谢谢!
发布于 2021-08-23 16:36:58
目前不支持生产者绑定的生命周期控制,只有使用者绑定。
https://stackoverflow.com/questions/68895758
复制相似问题