首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spring-kafka不能使用kafka-cluster

spring-kafka不能使用kafka-cluster
EN

Stack Overflow用户
提问于 2019-08-26 21:06:19
回答 1查看 82关注 0票数 3

我已经配置了3个kafka集群,我正在尝试使用spring-kafka。但是当我杀死kafka时,我不能发送其他消息到队列中。

Kafka版本2.0.0 spring-kafka版本2.0.1

kafka-topics.sh --describe -zookeeper=zoo1:2181打印

代码语言:javascript
复制
KAFKA_SWARM_TEST  PartitionCount:1        ReplicationFactor:2     Configs:
        Topic: KAFKA_SWARM_TEST Partition: 0    Leader: 2       Replicas: 1,2   Isr: 2,1

spring-kafka配置

代码语言:javascript
复制
spring.kafka.bootstrap-servers="kafka2:9094,kafka1:9093"

头儿是卡夫卡2号当我杀了kafka1。leader仍然是kafka1。但是spring-kafka会抛出

代码语言:javascript
复制
 Connection to node 1 could not be established.Broker may not be available.
 Discovered group coordinator kafka1:9093

看起来像弹簧-卡夫卡连接只需使用kafka1;

我的java代码

代码语言:javascript
复制
    @GetMapping(path = "/send",produces = MediaType.APPLICATION_JSON_VALUE)
    public JsonNode send() throws JsonProcessingException {
        ObjectNode put = JsonNodeFactory.instance.objectNode().put("status", "success");
        String topic = "KAFKA_SWARM_TEST";
        val msg = MessageBuilder
                .withPayload(objectMapper.writeValueAsString(put))
                .setHeader(KafkaHeaders.TOPIC, topic)
                .build();
        kafkaTemplate.send(msg);
        return put;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("KAFKA_SWARM_TEST", 1, (short) 2);
    }

    @KafkaListener(groupId="#{T(java.util.UUID).randomUUID().toString()}",topics = "KAFKA_SWARM_TEST")
    void testGetInfo(String message) throws IOException {
        log.error("getMessage: =====> " + message);
    }

kafka配置

代码语言:javascript
复制
version: '3.7'

services:

  zoo1:
    image: wurstmeister/zookeeper
    restart: always
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888
  zoo2:
    image: wurstmeister/zookeeper
    restart: always
    ports:
      - 2180:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888


  kafka1:
    image: wurstmeister/kafka
    restart: always
    ports:
    - "9093:9093"
    depends_on:
      - zoo1
      - zoo2
    privileged: true
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_HOST_NAME: $KAFKA_ADVERTISED_HOST_NAME
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181
      KAFKA_LOG_DIRS: /kafka
      KAFKA_SSL_KEYSTORE_LOCATION: /kafka_broker_cert/server.keystore.jks
      KAFKA_SSL_KEYSTORE_PASSWORD: ksstone430
      KAFKA_SSL_KEY_PASSWORD: ksstone430
      KAFKA_SSL_TRUSTSTORE_LOCATION: /kafka_broker_cert/server.truststore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: stsstone430
      KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092,SSL://$KAFKA_ADVERTISED_HOST_NAME:9093"
      KAFKA_SSL_CLIENT_AUTH: required
      LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 60
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "null"
    volumes:
      - ./kafka_broker_cert:/kafka_broker_cert
      - /var/run/docker.sock:/var/run/docker.sock

  kafka2:
    image: wurstmeister/kafka
    restart: always
    ports:
    - "9094:9093"
    depends_on:
      - zoo1
      - zoo2
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_HOST_NAME: $KAFKA_ADVERTISED_HOST_NAME
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181
      KAFKA_LOG_DIRS: /kafka
      KAFKA_SSL_KEYSTORE_LOCATION: /kafka_broker_cert/server.keystore.jks
      KAFKA_SSL_KEYSTORE_PASSWORD: ksstone430
      KAFKA_SSL_KEY_PASSWORD: ksstone430
      KAFKA_SSL_TRUSTSTORE_LOCATION: /kafka_broker_cert/server.truststore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: stsstone430
      KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092,SSL://$KAFKA_ADVERTISED_HOST_NAME:9093"
      KAFKA_SSL_CLIENT_AUTH: required
      LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 60
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "null"
    volumes:
      - ./kafka_broker_cert:/kafka_broker_cert
      - /var/run/docker.sock:/var/run/docker.sock
EN

回答 1

Stack Overflow用户

发布于 2019-08-26 21:38:12

当您杀死其中一个节点(例如,领导者kafka1)时,请尝试检查Kafka集群的新领导者选举是否有效

此外,请检查是否存在覆盖spring.kafka.bootstrap-servers的其他配置。可能有一个bean只指向kafka1:9093作为代理。

但是,即使bootstrap-servers属性仅指向kafka1:9093,在节点调整的情况下,使用者也应该找到代理的其他节点。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57658516

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档