首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡绑定的弹簧云流: /bindings执行器API不会停止生产者

卡夫卡绑定的弹簧云流: /bindings执行器API不会停止生产者
EN

Stack Overflow用户
提问于 2021-08-23 16:03:01
回答 1查看 363关注 0票数 0

我有一个项目,它包含Actuator和Kafka绑定器。我正在探索的bindings/执行器,并试图阻止一个生产者作为演习。我通过curl发出以下邮件请求:

curl -v 'localhost:8081/actuator/bindings/producer-out-0' -H 'content-type: application/json' -d '{"state": "STOPPED"}'

实际结果:查询返回204。生产者的状态(从GET /致动器/绑定/生产者-out-0中看到)现在是stopped。但是,生产者仍然在产生消息,这可以从有关这个主题的日志记录和使用者活动中看到。

预期结果:我希望制片人停止产生信息。(我也尝试使用暂停状态,该状态也返回204,但错误日志表明不能暂停该生产者。)

我是否误解了这个执行器的工作原理?当制片人被停止时,是否预期S.C.S.会继续对该制片人进行投票?我唯一知道的文档是这里,但据我所知,它并没有回答我的问题。

背景

我使用的是弹簧引导-启动-启动-父2.5.3,并有启动-web和启动-执行器列出的依赖关系。我想我什么都没错过。

这是生产者/消费者对。如你所见,我正在使用一个可查询的供应商。

代码语言:javascript
复制
@Configuration
@Profile("numbers")
public class NumberHandlers {
  private static final Logger LOGGER = LoggerFactory.getLogger(NumberHandlers.class);

  @Bean
  public Supplier<Integer> producer() {
    // Needed an effectively-final mutable integer. Side-bar comments welcome. :P
    var counter = new AtomicInteger();
    return () -> {
      var n = counter.getAndIncrement();
      LOGGER.info("Producing number: " + n);
      return n;
    };
  }

  @Bean
  public Consumer<Integer> consumer() {
    return it -> LOGGER.info("Consuming number: " + it);
  }
}

当我传入numbers配置文件时,这些都是活动的。我的配置在下面。

application.yml:

代码语言:javascript
复制
server:
  port: 8081
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: ${env.kafka.bootstrapservers:localhost}
management:
  endpoints:
    web:
      exposure:
        include: 'bindings'

..。和应用程序-编号:

代码语言:javascript
复制
spring:
  cloud:
    stream:
      poller:
        fixedDelay: 5000
      bindings:
        producer-out-0:
          destination: numbers-raw
          producer:
            partitionCount: 3
        consumer-in-0:
          destination: numbers-raw
      kafka:
        bindings:
          producer-out-0:
            producer:
              topic.properties:
                # These look weird because they're done as an exercise.
                retention.bytes: 10000
                retention.ms: 172800000
    function:
      definition: producer;consumer

我正在一个本地主机环境中进行测试,使用的是在主机网络上编写卡夫卡和动物园管理员的对接器。

谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-23 16:36:58

目前不支持生产者绑定的生命周期控制,只有使用者绑定。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68895758

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档