首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当使用KStream将字符串更改为avro inKafka流时出现空指针异常

当使用KStream将字符串更改为avro inKafka流时出现空指针异常
EN

Stack Overflow用户
提问于 2017-06-08 04:28:55
回答 1查看 1.4K关注 0票数 0

我被困在这个问题上,弄不清楚到底是怎么回事。我试图使用卡夫卡流来写一个日志到一个主题。在另一端,我有卡夫卡-连接进入进入MySQL的每个条目。所以,基本上,我需要的是一个Kafka streams程序,它将一个主题作为字符串读取,并将其解析为Avro格式,然后将其输入另一个主题。

下面是我写的代码:

代码语言:javascript
复制
        //Define schema
        String userSchema = "{"
                + "\"type\":\"record\","
                + "\"name\":\"myrecord\","
                + "\"fields\":["
                + "  { \"name\":\"ID\", \"type\":\"int\" },"
                + "  { \"name\":\"COL_NAME_1\", \"type\":\"string\" },"
                + "  { \"name\":\"COL_NAME_2\", \"type\":\"string\" }"
    + "]}";

        String key = "key1";
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);

//Settings
       System.out.println("Kafka Streams Demonstration");
        //Settings
        Properties settings = new Properties();
        // Set a few key parameters
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        // Kafka bootstrap server (broker to talk to); ubuntu is the host name for my VM running Kafka, port 9092 is where the (single) broker listens
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // Apache ZooKeeper instance keeping watch over the Kafka cluster; ubuntu is the host name for my VM running Kafka, port 2181 is where the ZooKeeper listens
        settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        // default serdes for serialzing and deserializing key and value from and to streams in case no specific Serde is specified
        settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        settings.put(StreamsConfig.STATE_DIR_CONFIG ,"/tmp");
        // to work around exception Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
        // at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
        // see: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo

        // Create an instance of StreamsConfig from the Properties instance
        StreamsConfig config = new StreamsConfig(getProperties());
        final Serde < String > stringSerde = Serdes.String();
        final Serde < Long > longSerde = Serdes.Long();
        final Serde<byte[]> byteArraySerde = Serdes.ByteArray();

        // building Kafka Streams Model                                                                                                                                                       
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        // the source of the streaming analysis is the topic with country messages                                                                                                            
        KStream<byte[], String> instream =
            kStreamBuilder.stream(byteArraySerde, stringSerde, "sqlin");

       final KStream<byte[], GenericRecord> outstream = instream.mapValues(new ValueMapper<String, GenericRecord>() {
            @Override
            public GenericRecord apply(final String record) {
                System.out.println(record);
                GenericRecord avroRecord = new GenericData.Record(schema);
                String[] array = record.split(" ", -1);
                for (int i = 0; i < array.length; i = i + 1) {
                    if (i == 0)
                        avroRecord.put("ID", Integer.parseInt(array[0]));
                    if (i == 1)
                        avroRecord.put("COL_NAME_1", array[1]);
                    if (i == 2)
                        avroRecord.put("COL_NAME_2", array[2]);
                }
                System.out.println(avroRecord);
                return avroRecord;
            }
          });
        outstream.to("sqlout");

这是在得到一个Null指针异常后的输出:

-cp流-示例-3.2.1-Standalone.jar io.confluent.examples.streams.sql Kafka Streams演示开始现在启动CountriesStreams示例5-这是{"ID":5,"COL_NAME_1":" this ","COL_NAME_2":“是”}线程中的例外"StreamThread-1“java.lang.NullPointerException在org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) )在org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)在org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)在org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)在org.apache.kafka.streams.processor。internals.StreamTask.process(StreamTask.java:197) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

主题sqlin包含一些消息,其中包括一个数字,后面跟着两个单词。注意这两行:函数得到一条消息,并在捕获空指针之前成功地解析它。问题是我对Java、Kafka和Avro并不熟悉,所以我不知道自己哪里出了问题。我设置了Avro模式对吗?还是用得不对?这里的任何帮助都将不胜感激。

EN

回答 1

Stack Overflow用户

发布于 2017-06-08 10:39:49

我认为问题在于以下几点:

代码语言:javascript
复制
outstream.to("sqlout");

默认情况下,应用程序被配置为对记录键和记录值使用String服务器:

代码语言:javascript
复制
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

由于outstream具有KStream<byte[], GenericRecord>类型,所以在调用to()时必须提供显式的serdes。

代码语言:javascript
复制
// sth like
outstream.to(Serdes.ByteArray(), yourGenericAvroSerde, "sqlout");

FYI:下一个版本的Confluent (ETA:本月=2017年6月)将发布一个与ready-to-use generic + specific Avro serde集成的Confluent schema registry。这会让你的生活更轻松。

有关更多细节,请参见我在https://stackoverflow.com/a/44433098/1743580的回答。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44426657

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档