首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用kafka 0.8运行camus示例

使用kafka 0.8运行camus示例
EN

Stack Overflow用户
提问于 2014-02-02 08:01:10
回答 1查看 3.7K关注 0票数 2

我是加缪的新手,我想试着使用它,到目前为止,我的kafka 0.8下载了源代码,创建了2个队列,如例expect配置了作业配置文件(见下文),并尝试使用以下命令在我的计算机上运行它(详细信息如下)

代码语言:javascript
复制
$JAVA_HOME/bin/java -cp camus-example-0.1.0-SNAPSHOT.jar com.linkedin.camus.etl.kafka.CamusJob -P /root/Desktop/camus-workspace/camus-master/camus-example/target/camus.properties

jar包含所有依赖项,如阴影文件。

我得到了一个错误:

代码语言:javascript
复制
[EtlInputFormat] - Discrading topic : TestQueue
[EtlInputFormat] - Discrading topic : test
[EtlInputFormat] - Discrading topic : DummyLog2
[EtlInputFormat] - Discrading topic : test3
[EtlInputFormat] - Discrading topic : TwitterQueue
[EtlInputFormat] - Discrading topic : test2
[EtlInputFormat] - Discarding topic (Decoder generation failed) : DummyLog
[CodecPool] - Got brand-new compressor
[JobClient] - Running job: job_local_0001
[JobClient] -  map 0% reduce 0%
[JobClient] - Job complete: job_local_0001
[JobClient] - Counters: 0
[CamusJob] - Job finished

当我试图使用intellij-idea编辑器运行它时,我得到了一些错误,但发现了错误的原因。

代码语言:javascript
复制
java.lang.RuntimeException: java.lang.ClassNotFoundException: com.linkedin.batch.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder

有人能向我解释我做错了什么吗?

camus配置文件

代码语言:javascript
复制
# Needed Camus properties, more cleanup to come

# final top-level data output directory, sub-directory will be dynamically created for each topic pulled
etl.destination.path=/root/Desktop/camus-workspace/camus-master/camus-example/target/1
# HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files
etl.execution.base.path=/root/Desktop/camus-workspace/camus-master/camus-example/target/2
# where completed Camus job output directories are kept, usually a sub-dir in the base.path
etl.execution.history.path=/root/Desktop/camus-workspace/camus-master/camus-example/target3

# Kafka-0.8 handles all zookeeper calls
#zookeeper.hosts=localhost:2181
#zookeeper.broker.topics=/brokers/topics
#zookeeper.broker.nodes=/brokers/ids

# Concrete implementation of the Encoder class to use (used by Kafka Audit, and thus optional for now)
#camus.message.encoder.class=com.linkedin.batch.etl.kafka.coders.DummyKafkaMessageEncoder

# Concrete implementation of the Decoder class to use
camus.message.decoder.class=com.linkedin.batch.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder

# Used by avro-based Decoders to use as their Schema Registry
kafka.message.coder.schema.registry.class=com.linkedin.camus.example.DummySchemaRegistry

# Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all
# topic that do not have a partitioner specified
#etl.partitioner.class=com.linkedin.camus.etl.kafka.coders.DefaultPartitioner

# Partitioners can also be set on a per-topic basis
#etl.partitioner.class.<topic-name>=com.your.custom.CustomPartitioner

# all files in this dir will be added to the distributed cache and placed on the classpath for hadoop tasks
# hdfs.default.classpath.dir=/root/Desktop/camus-workspace/camus-master/camus-example/target

# max hadoop tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=30
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# events with a timestamp older than this will be discarded. 
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1

# if whitelist has values, only whitelisted topic are pulled.  nothing on the blacklist is pulled
kafka.blacklist.topics=
kafka.whitelist.topics=DummyLog
log4j.configuration=true

# Name of the client as seen by kafka
kafka.client.name=camus
# Fetch Request Parameters
kafka.fetch.buffer.size=
kafka.fetch.request.correlationid=
kafka.fetch.request.max.wait=
kafka.fetch.request.min.bytes=
# Connection parameters.
kafka.brokers=localhost:9092
kafka.timeout.value=


#Stops the mapper from getting inundated with Decoder exceptions for the same topic
#Default value is set to 10
max.decoder.exceptions.to.print=5

#Controls the submitting of counts to Kafka
#Default value set to true
post.tracking.counts.to.kafka=true

log4j.configuration=true

# everything below this point can be ignored for the time being, will provide more documentation down the road
##########################
etl.run.tracking.post=false
kafka.monitor.tier=
etl.counts.path=
kafka.monitor.time.granularity=10

etl.hourly=hourly
etl.daily=daily
etl.ignore.schema.errors=false

# configure output compression for deflate or snappy. Defaults to deflate
etl.output.codec=deflate
etl.deflate.level=6
#etl.output.codec=snappy

etl.default.timezone=America/Los_Angeles
etl.output.file.time.partition.mins=60
etl.keep.count.files=false
etl.execution.history.max.of.quota=.8

mapred.output.compress=true
mapred.map.max.attempts=1

kafka.client.buffer.size=20971520
kafka.client.so.timeout=60000

#zookeeper.session.timeout=
#zookeeper.connection.timeout=

机器详细信息:

hortonworks - hdp 2.0.0.6和kafka 0.8 beta 1

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2014-02-05 11:30:30

包名上有错误。

变化

代码语言:javascript
复制
camus.message.decoder.class=com.linkedin.batch.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder

代码语言:javascript
复制
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder

此外,您还需要指定一些与Kafka相关的属性或注释它(通过这种方式Camus将使用默认值):

代码语言:javascript
复制
# Fetch Request Parameters
# kafka.fetch.buffer.size=
# kafka.fetch.request.correlationid=
# kafka.fetch.request.max.wait=
# kafka.fetch.request.min.bytes=
# Connection parameters.
kafka.brokers=localhost:9092
# kafka.timeout.value=
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21508355

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档