首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Camus的预期提交/回滚行为是什么?

Camus的预期提交/回滚行为是什么?
EN

Stack Overflow用户
提问于 2016-07-10 06:51:43
回答 0查看 238关注 0票数 1

我们已经运行Camus大约一年了,成功地从Kafka (0.82版本)中提取了avro有效载荷,并使用几个Kafka主题将其存储为.avro文件。最近,我们公司内部的一个新团队在我们的预生产环境中注册了大约60个新主题,并开始向这些主题发送数据。该团队在将他们的数据路由到kafka主题时犯了一些错误,这导致了当Camus将这些主题的有效负载反序列化到avro时出现错误。Camus作业由于超过“其他失败”错误阈值而失败。失败后在Camus中产生的行为是令人惊讶的,我想检查一下其他开发人员,看看我们观察到的行为是否是预期的,或者我们的实现是否存在问题。

我们注意到,当Camus作业由于超过'failed other‘阈值而失败时,我们注意到了这种行为: 1.所有映射器任务都成功了,因此TaskAttempt被允许提交-这意味着Camus写入的所有数据都被复制到了最终的HDFS位置。2. CamusJob在计算%错误率(这是在映射器提交之后)时抛出异常,导致作业失败3.因为作业失败(我认为),Kafka偏移量没有提前

我们在此行为中遇到的问题是我们的Camus作业设置为每5分钟运行一次。因此,每隔5分钟,我们就会看到数据提交到HDFS,作业失败,并且Kafka偏移量没有更新-这意味着我们写入重复的数据,直到我们注意到我们的磁盘已满。

我写了一个集成测试来确认结果-它向一个主题提交了10条良好的记录,并向同一主题提交了10条使用意外模式的记录,运行了仅包含该主题白名单的Camus作业,我们可以看到有10条记录被写入HDFS,而Kafka偏移量并未提升。下面是该测试的日志片段,以及我们在运行作业时使用的属性。

感谢任何帮助-我不确定这是加缪的预期行为,还是我们的实现有问题,以及防止这种行为(复制数据)的最好方法是什么。

谢谢~马特

测试的CamusJob属性:

代码语言:javascript
复制
etl.destination.path=/user/camus/kafka/data
etl.execution.base.path=/user/camus/kafka/workspace
etl.execution.history.path=/user/camus/kafka/history
dfs.default.classpath.dir=/user/camus/kafka/libs

etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder

camus.message.timestamp.format=yyyy-MM-dd HH:mm:ss Z
mapreduce.output.fileoutputformat.compress=false

mapred.map.tasks=15
kafka.max.pull.hrs=1
kafka.max.historical.days=3

kafka.whitelist.topics=advertising.edmunds.admax
log4j.configuration=true

kafka.client.name=camus
kafka.brokers=<kafka brokers>
max.decoder.exceptions.to.print=5
post.tracking.counts.to.kafka=true
monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
etl.schema.registry.url=<schema repo url>
etl.run.tracking.post=false
kafka.monitor.time.granularity=10

etl.daily=daily
etl.ignore.schema.errors=false

etl.output.codec=deflate
etl.deflate.level=6
etl.default.timezone=America/Los_Angeles
mapred.output.compress=false
mapred.map.max.attempts=2

来自测试的日志片段,显示映射器成功后的提交行为,以及由于超过“其他”阈值而导致的后续作业失败:

代码语言:javascript
复制
LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task:attempt_local866350146_0001_m_000000_0 is done. And is in the process of committing

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task attempt_local866350146_0001_m_000000_0 is allowed to commit now

[EtlMultiOutputFormat] - work path: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0

[EtlMultiOutputFormat] - Destination base path: /user/camus/kafka/data

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.3.2.2.1467979200000.avro

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.7.8.8.1467979200000.avro

[Task] - Task 'attempt_local866350146_0001_m_000000_0' done.
[LocalJobRunner] - Finishing task: attempt_local866350146_0001_m_000000_0
[LocalJobRunner] - map task executor complete.
[Job] -  map 100% reduce 0%
[Job] - Job job_local866350146_0001 completed successfully
[Job] - Counters: 23
File System Counters
FILE: Number of bytes read=117251
FILE: Number of bytes written=350942
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=10
Map output records=15
Input split bytes=793
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=13
Total committed heap usage (bytes)=251658240
com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
DECODE_SUCCESSFUL=10
SKIPPED_OTHER=10
File Input Format Counters 
Bytes Read=0
File Output Format Counters 
Bytes Written=5907
total
data-read=840
decode-time(ms)=123
event-count=20
mapper-time(ms)=58
request-time(ms)=12114
skip-old=0
[CamusJob] - Group: File System Counters
[CamusJob] - FILE: Number of bytes read:    117251
[CamusJob] - FILE: Number of bytes written: 350942
[CamusJob] - FILE: Number of read operations:   0
[CamusJob] - FILE: Number of large read operations: 0
[CamusJob] - FILE: Number of write operations:  0
[CamusJob] - Group: Map-Reduce Framework
[CamusJob] - Map input records: 10
[CamusJob] - Map output records:    15
[CamusJob] - Input split bytes: 793
[CamusJob] - Spilled Records:   0
[CamusJob] - Failed Shuffles:   0
[CamusJob] - Merged Map outputs:    0
[CamusJob] - GC time elapsed (ms):  13
[CamusJob] - Total committed heap usage (bytes):    251658240
[CamusJob] - Group: com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
[CamusJob] - DECODE_SUCCESSFUL: 10
[CamusJob] - SKIPPED_OTHER: 10
[CamusJob] - job failed: 50.0% messages skipped due to other, maximum allowed is 0.1%
EN

回答

页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38287017

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档