首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法以AVRO格式从卡夫卡生产商发送GenericRecord数据

无法以AVRO格式从卡夫卡生产商发送GenericRecord数据
EN

Stack Overflow用户
提问于 2018-08-09 19:01:46
回答 2查看 3.3K关注 0票数 1

使用confluent OSS-5.0.0-2.11我的Kafka生产者代码是

代码语言:javascript
复制
public class AvroProducer {
 public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("ZOOKEEPER_HOST", "localhost");
        //props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081");
        String topic = "confluent-new";

        Schema.Parser parser = new Schema.Parser();
// I will get below schema string from SCHEMA REGISTRY
        Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"location\",\"type\":\"string\",\"default\":\"Noida\"}]}");

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
        GenericRecord record = new GenericData.Record(schema);
        record.put("uID", "06080000");
        record.put("userName", "User data10");
        record.put("company", "User data10");
        record.put("age", 12);
        record.put("location", "User data10");

        ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
        producer.send(recordData);

        System.out.println("Message Sent");
    }

}

似乎生产者代码是安全的,能够看到消息发送到控制台上的

Kafka消费者代码是:

代码语言:javascript
复制
public class AvroConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("ZOOKEEPER_HOST", "localhost");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("group.id", "consumer1");
    props.put("auto.offset.reset", "latest");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("schema.registry.url", "http://localhost:8081");
    String topic = "confluent-new";

    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
    consumer.subscribe(Arrays.asList(topic));
    while(true){
        ConsumerRecords<String, GenericRecord> recs = consumer.poll(10000);
        for (ConsumerRecord<String, GenericRecord> rec : recs) {
            System.out.printf("{AvroUtilsConsumerUser}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());
        }
    }
}

}

我无法看到卡夫卡消费者端的信息(数据)。此外,我还检查了confluent_new主题的偏移计数/状态及其不更新。似乎生产者代码有问题了。任何指针都会有帮助。

同时,下面的生产者代码正在工作,这里的POJO,即用户是avro工具生成的POJO。

代码语言:javascript
复制
public class AvroProducer {
 public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();
        kafkaParams.put("auto.offset.reset", "smallest");
        kafkaParams.put("ZOOKEEPER_HOST", "bihdp01");*/
        props.put("bootstrap.servers", "localhost:9092");
        props.put("ZOOKEEPER_HOST", "localhost");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081");
        String topic = "confluent-new";

        Producer<String, User> producer = new KafkaProducer<String, User>(props);
        User user = new User();
        user.setUID("0908");
        user.setUserName("User data10");
        user.setCompany("HCL");
        user.setAge(20);
        user.setLocation("Noida");
        ProducerRecord<String, User> record = new ProducerRecord<String, User>(topic, (String) user.getUID(), user);
        producer.send(record).get();
        System.out.println("Sent");
    }

}

P.S.我的要求是以AVRO格式将接收到的JSON数据从源KAFKA主题发送到目的地KAFKA主题。首先,我使用AVRO4S从接收到的JSON数据推断AVRO模式,并将模式注册到模式注册中心。接下来是从接收到的JSON中提取数据并在GenericRecord实例中填充,并使用KafkaAvroSerializer将这个GenericRecord实例发送到Kafka主题。在使用者端,我将使用KafkaAvroDeserializer来反序列化接收到的AVRO数据。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-08-12 15:41:49

在寻找解决方案的过程中,我尝试了Thread.sleep(1000),它解决了我的问题。此外,我尝试了producer.send(record).get(),这也解决了问题。在浏览了文档之后,我看到了下面的代码片段,它提示了解决方案。

代码语言:javascript
复制
// When you're finished producing records, you can 
   flush the producer to ensure it has all been `written` to Kafka and
   // then close the producer to free its resources.

finally {
  producer.flush();
  producer.close();
  }

这是解决这个问题的最好办法。

票数 2
EN

Stack Overflow用户

发布于 2018-08-10 17:20:22

请尝试在第一个生产者中添加get()

代码语言:javascript
复制
producer.send(recordData).get();
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51774107

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档