首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka spring集成如何处理Kafka服务器宕机

Kafka spring集成如何处理Kafka服务器宕机
EN

Stack Overflow用户
提问于 2016-03-13 17:40:32
回答 1查看 1.3K关注 0票数 0

我有一个web应用程序,通过读取请求中的一些值发送kafka消息。如果我的kafka服务器宕机了,我该如何发送错误状态。现在,在服务器宕机的情况下,prodcuer不断地尝试连接,并无限地记录以下错误。

错误:

代码语言:javascript
复制
14:56:10.181 [kafka-producer-network-thread | producer-1] WARN  o.a.kafka.common.network.Selector - Error in I/O with localhost/127.0.0.1
java.net.ConnectException: Connection refused

生产者配置为:

代码语言:javascript
复制
<int:channel id="inputToKafka">
    <int:queue/>
</int:channel>

<int-kafka:outbound-channel-adapter
        id="kafkaOutboundChannelAdapter"
        auto-startup="true"
        kafka-producer-context-ref="kafkaProducerContext"
        channel="inputToKafka"  >
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="10" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>

<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>

<int-kafka:producer-context id="kafkaProducerContext">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="localhost:9092"
                                          key-class-type="java.lang.String"
                                          value-class-type="java.lang.String"
                                          sync="true"
                                          send-timeout="10"
                                          topic="test"
                                          key-encoder="kafkaEncoder"
                                          value-encoder="kafkaEncoder"
                                          compression-type="none"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>
<bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder">
</bean>
<bean id="valueEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
    <constructor-arg value="com.fastretailing.catalogPlatformSCMProducer.model.ProducerMessage" />
</bean>

发送代码为:

代码语言:javascript
复制
 ProducerMessage k = list.get(0);
    boolean status = false;
    try {
        status = inputToKafka.send(
                MessageBuilder.withPayload(k.getJsonString(k))
                        .setHeader(KafkaHeaders.MESSAGE_KEY, k.getFeedName())  // Note: the header was `messageKey` in earlier versions
                        .setHeader(KafkaHeaders.TOPIC, "test")       // Note: the header was `topic` in earlier versions
                        .build()
        );
    } catch (Exception e) {
        System.out.println("eeeeeeeeeeeeeeeeeeeeeeeeeeeeee");
        System.out.println(e);
    }
    System.out.println(status);
    if (!status){
        System.out.println("errrrrrrrrrrrrrrrrrrrrrrrr");
    }

我总是得到的状态为true。即使kafka服务器宕机了。并且不断出现上述连接被拒绝的错误。

EN

回答 1

Stack Overflow用户

发布于 2016-03-13 22:47:50

inputToKafka中删除轮询器和<queue/>元素。

所有这些状态都表明消息已在通道ok中排队。

将其设为DirectChannel (通过删除队列)意味着适配器send将在调用线程上运行。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35968725

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档