首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kakfa生产者- Spring Boot应用程序-无法生成消息

Kakfa生产者- Spring Boot应用程序-无法生成消息
EN

Stack Overflow用户
提问于 2020-05-06 21:23:36
回答 1查看 493关注 0票数 2

我正在学习spring boot和kakfa。我已经研究了一下,并配置了一个示例生产者应用程序,如下所示。但是,我不能发布这些消息。如果我能在这里缺少的东西上得到帮助就太好了。我已经启动了zookeeper服务和kakfa服务,并确保topic可用。

代码语言:javascript
复制
Config:

    import java.util.HashMap;
    import java.util.Map;

    import com.jpmorgan.sample.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;

    @Configuration
    public class KafkaConfig {

        @Value("${kafka.bootstrap-servers}")
        private String bootstrapServers;

        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);

            return props;
        }

        @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }

        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }

        @Bean
        public KafkaProducer sender() {
            return new KafkaProducer();
        }
    }


Producer Class:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaProducer {

    private static final Logger LOGGER =
            LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String payload) {
        LOGGER.info("sending payload='{}'", payload);
        kafkaTemplate.send("test", payload);
    }
}

Sample Application Class:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaProducerSampleApplication {

    public static void main(final String[] args) {
        SpringApplication.run(KafkaProducerSampleApplication.class, args);
    }

}

我只是在运行IntelliJ。我看到moneta启动了

代码语言:javascript
复制
         main] n$WebApplicationLoggingAutoConfiguration : Enabled Moneta Request Logging with exclude-url-patterns: [], http.log-request-headers: [false], http.log-response-headers: [false], http.log-request-entity: [false], http.log-response-entity: [false] and http.max-entity-bytes: [1024]
2020-05-06 18:51:57.539  INFO 4952 --- [           main] c.j.m.b.a.s.cors.CorsAutoConfiguration   : Moneta CORS has been disabled because neither [moneta.cors.allowed-origins] nor [moneta.cors.allowed-origins-regex] has been set
2020-05-06 18:51:57.628  INFO 4952 --- [           main] .m.b.a.a.MonetaActuatorAutoConfiguration : Enabled Moneta defaults for Spring Boot Actuator
2020-05-06 18:51:57.677  INFO 4952 --- [           main] o.s.boot.web.servlet.RegistrationBean    : Filter corsFilter was not registered (disabled)
2020-05-06 18:51:57.860  INFO 4952 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-06 18:51:57.956  WARN 4952 --- [           main] c.j.m.b.startup.ApplicationInfoLoader    : !!!! SEAL ID property [application.seal.id] should be provided as either as a system property or in the application properties file !!!!
2020-05-06 18:51:58.054  INFO 4952 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 4 endpoint(s) beneath base path '/actuator'
2020-05-06 18:51:58.111  INFO 4952 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2020-05-06 18:51:58.114  INFO 4952 --- [           main] c.j.s.KafkaProducerSampleApplication     : Started KafkaProducerSampleApplication in 2.547 seconds (JVM running for 4.376)
EN

回答 1

Stack Overflow用户

发布于 2020-05-06 21:36:19

尝试这种方法,您不需要在生产者配置中使用@EnableKafka进行注释

从producerConfigs中删除@Bean,然后尝试,我将工作

代码语言:javascript
复制
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61636349

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档