首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >加缪-卡夫卡的例子

加缪-卡夫卡的例子
EN

Stack Overflow用户
提问于 2014-12-31 13:08:14
回答 4查看 1.1K关注 0票数 2

我的用法是我想把Avro数据从Kafka推到HDFS。加缪似乎是正确的工具,但我无法使它工作。我是加缪的新手,试着让加缪--例子,https://github.com/linkedin/camus

现在我正试着让加缪的例子发挥作用。然而,我仍然面临着一些问题。

用于DummyLogKafkaProducerClient的代码片段

代码语言:javascript
复制
package com.linkedin.camus.example.schemaregistry;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageEncoder;
import com.linkedin.camus.example.records.DummyLog;

public class DummyLogKafkaProducerClient {


    public static void main(String[] args) {

        Properties props = new Properties();

        props.put("metadata.broker.list", "localhost:6667");
        // props.put("serializer.class", "kafka.serializer.StringEncoder");
        // props.put("partitioner.class", "example.producer.SimplePartitioner");
        //props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, byte[]> producer = new Producer<String, byte[]>(config);

        KafkaAvroMessageEncoder encoder = get_DUMMY_LOG_Encoder();

        for (int i = 0; i < 500; i++) {
            KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>("DUMMY_LOG", encoder.toBytes(getDummyLog()));
            producer.send(data);

        }
    }

    public static DummyLog getDummyLog() {
        Random random = new Random();
        DummyLog dummyLog = DummyLog.newBuilder().build();
        dummyLog.setId(random.nextLong());
        dummyLog.setLogTime(new Date().getTime());
        Map<CharSequence, CharSequence> machoStuff = new HashMap<CharSequence, CharSequence>();
        machoStuff.put("macho1", "abcd");
        machoStuff.put("macho2", "xyz");
        dummyLog.setMuchoStuff(machoStuff);
        return dummyLog;
    }

    public static KafkaAvroMessageEncoder get_DUMMY_LOG_Encoder() {
        KafkaAvroMessageEncoder encoder = new KafkaAvroMessageEncoder("DUMMY_LOG", null);
        Properties props = new Properties();
        props.put(KafkaAvroMessageEncoder.KAFKA_MESSAGE_CODER_SCHEMA_REGISTRY_CLASS, "com.linkedin.camus.example.schemaregistry.DummySchemaRegistry");
        encoder.init(props, "DUMMY_LOG");
        return encoder;

    }
}

我还添加了DummySchemaRegistry的默认no-arg构造函数,因为它提供了实例化异常。

代码语言:javascript
复制
package com.linkedin.camus.example.schemaregistry;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;

import com.linkedin.camus.example.records.DummyLog;
import com.linkedin.camus.example.records.DummyLog2;
import com.linkedin.camus.schemaregistry.MemorySchemaRegistry;

/**
 * This is a little dummy registry that just uses a memory-backed schema registry to store two dummy Avro schemas. You
 * can use this with camus.properties
 */
public class DummySchemaRegistry extends MemorySchemaRegistry<Schema> {
    public DummySchemaRegistry(Configuration conf) {
        super();
        super.register("DUMMY_LOG", DummyLog.newBuilder().build().getSchema());
        super.register("DUMMY_LOG_2", DummyLog2.newBuilder().build()
                .getSchema());
    }
    public DummySchemaRegistry() {
        super();
        super.register("DUMMY_LOG", DummyLog.newBuilder().build().getSchema());
        super.register("DUMMY_LOG_2", DummyLog2.newBuilder().build().getSchema());
    }
}

在运行程序后获得的异常跟踪

线程"main“com.linkedin.camus.coders.MessageEncoderException: org.apache.avro.AvroRuntimeException: org.apache.avro.AvroRuntimeException: Field id类型中的异常:LONG pos:0未设置,在com.linkedin.camus.example.schemaregistry.DummyLogKafkaProducerClient.get_DUMMY_LOG_Encoder(DummyLogKafkaProducerClient.java:57) at com.linkedin.camus.example.schemaregistry.DummyLogKafkaProducerClient的com.linkedin.camus.example.schemaregistry.DummyLogKafkaProducerClient.get_DUMMY_LOG_Encoder(DummyLogKafkaProducerClient.java:57)上没有默认值com.linkedin.camus.example.records.DummyLog$Builder.build(DummyLog.java:214) (DummyLogKafkaProducerClient.java:32)由: org.apache.avro.AvroRuntimeException: org.apache.avro.AvroRuntimeException:字段id类型:LONG pos:0未设置并在com.linkedin.camus.example.schemaregistry.DummySchemaRegistry.(DummySchemaRegistry.java:16) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native方法的sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java中没有默认值引起):62)在sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at java.lang.Class.newInstance(Class.java:438) at com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageEncoder.init(KafkaAvroMessageEncoder.java:52) .又有2种原因是: org.apache.avro.AvroRuntimeException: Field id type:LONG pos:0 not set,并且在org没有默认值.apache.avro.data.RecordBuilderBase.defaultValue(RecordBuilderBase.java:151) at com.linkedin.camus.example.records.DummyLog$Builder.build(DummyLog.java:209) .再来9

EN

回答 4

Stack Overflow用户

发布于 2015-02-09 23:56:14

我想camus希望Avro模式具有默认值。我把我的dummyLog.avsc改成了下面的和重新编译的-

{“命名空间”:"com.linkedin.camus.example.records“、”类型“:”记录“、”名称“:"DummyLog”、“文档”:“日志中不太重要的内容”、“字段”:{“名称”:"id“、”类型“:"int”、“默认”:0 },{“名称”:"logTime",“类型”:"int",“默认”:0}

如果对你有用的话请告诉我。

谢谢,Ambarish

票数 1
EN

Stack Overflow用户

发布于 2015-03-24 05:14:56

可以默认任何字符串或长字段,如下所示

代码语言:javascript
复制
  {"type":"record","name":"CounterData","namespace":"org.avro.usage.tutorial","fields":[{"name":"word","type":["string","null"]},{"name":"count","type":["long","null"]}]}
票数 0
EN

Stack Overflow用户

发布于 2015-09-30 10:53:34

Camus并不认为架构会有默认值。我最近用过加缪发现了同样的问题。实际上,它在模式注册表中使用的方式在默认示例中是不正确的。我在加缪代码中做了一些修改,您可以查看https://github.com/chandanbansal/camus,有一些小的更改可以使它工作。他们没有阿夫罗唱片的解码器。我也写过这个。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27721597

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档