首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星星之火卡夫卡结构化流:发布并发更新的日志。检测到多个流作业

星星之火卡夫卡结构化流:发布并发更新的日志。检测到多个流作业
EN

Stack Overflow用户
提问于 2018-09-13 13:27:43
回答 1查看 2.9K关注 0票数 1

我正在尝试从kafka源运行结构化流,并将其退回到kafka主题。

在我目前的设置中,我正在通过火花提交安排两个火花作业。

每一项工作都有自己独特的卡夫卡主题。但他们都写了一个共同的话题。

我目前的火花违约情况包括:

代码语言:javascript
复制
spark.streaming.concurrentJobs 5
spark.scheduler.mode FAIR

当这两个工作都是独立调度时,它们就会按预期工作。但是,当我试图将它们放在一起时,通过一个接一个地提交,首先提交的作业将停止使用日志进行响应:

代码语言:javascript
复制
java.lang.AssertionError: assertion failed: Concurrent update to the log. Multiple streaming jobs detected for 10
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:339)

有我失踪的糖果吗?我们如何安排并发作业写入相同的卡夫卡主题在星火?感谢你的想法。

编辑:写作同一个卡夫卡主题

编辑:格式化问题标题

EN

回答 1

Stack Overflow用户

发布于 2020-01-09 06:01:52

当我试图写到两个卡夫卡水槽时,我也犯了同样的错误,这就是我是如何解决它的。我不确定这是否适用于您的情况,因为在我的例子中,我使用的是故障恢复的检查点

对两个接收器使用相同的检查点文件是造成问题的原因。所以如果你使用检查点,这可能是解决办法.

下面是在我的例子中产生错误的代码(使用Python):

代码语言:javascript
复制
output_query_1 = aggDF_1.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .option("topic", TOPIC_1) \
    .outputMode("append") \
    .start()

output_query_2 = aggDF_2.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .option("topic", TOPIC_2) \
    .outputMode("append") \
    .start()

通过不同的检查点文件作为checkpointLocation解决了我的问题:

代码语言:javascript
复制
output_query_1 = aggDF_1.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/tmp/checkpoint_1") \
    .option("topic", TOPIC_1) \
    .outputMode("append") \
    .start()

output_query_2 = aggDF_2.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/tmp/checkpoint_2") \
    .option("topic", TOPIC_2) \
    .outputMode("append") \
    .start()
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52314765

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档