首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka Consumer with JSON?

Kafka Consumer with JSON?
EN

Stack Overflow用户
提问于 2021-03-17 07:23:16
回答 1查看 66关注 0票数 0

可以使用Kafka从post HTTP请求中获取JSON对象,将它们放入主题中,然后将它们发送给消费者(数据库)吗?

顺便说一句,下面是我的KafkaConfig类:

代码语言:javascript
复制
@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, User> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
    @Bean
    static public ProducerFactory<String,User> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public ConsumerFactory<String,User> consumerFactory(){
        Map<String, Object> config =  new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_id");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,User> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,User> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-17 08:44:38

我假设您知道如何使用spring项目创建post REST point。基本上,在从端点获得json输入之后,只需使用kafkaTemplate引用将json对象发送到kafka即可。像这样的伪代码

代码语言:javascript
复制
@RestController
class ExampleController

@Autowired
private final KafkaTemplate kafkaTemplate;

@PostMapping("/anyPath")
public void post(final ObjectAsJson yourObject) {

   kafkaTemplate.doSend​(// here map your object to a Producer Record)
   // depending on your use you can return a custom success response
}

然后,您可以将一个方法与KafkaListener注释连接起来,以使用该方法并将其写入数据库。

代码语言:javascript
复制
@KafkaListener(topics = "topicName", groupId = "foo", containerFactory = "kafkaListenerContainerFactory")
public void listen(YourCustomObject message) {
    // with kafkaListenerContainerFactory it should deserialise it to your desired object and here you can just write your database insertion here
}

另外,我会看看Kafka Connect,它对像这样的集成很有帮助,你想要实现http作为源,数据库作为接收器,kafka主题介于两者之间。

希望对您有所帮助。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66664874

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档