当Spark结构化流中的查询执行不设置触发器时,
import org.apache.spark.sql.streaming.Trigger
// Default trigger (runs micro-batch as soon as it can)
df.writeStream
.format("console")
//.trigger(???) // <--- Trigger intentionally omitted ----
.start()截至火花2.4.3 (2019年8月)。结构化流编程指南-触发器说
如果没有显式指定触发器设置,那么默认情况下,查询将以微批处理模式执行,在前面的微批处理完成后,将立即生成微批处理。
问题:是根据哪一个默认触发器来决定微批次的大小?
比方说。输入源是卡夫卡。由于一些故障,这项工作中断了一天。那么同样的火花作业就会重新启动。然后,它将在中断的地方使用消息。这是否意味着,第一批微批次将是一个巨大的批次,1天的味精积累在卡夫卡的主题中,而工作却停止了?假设这个任务需要10个小时来处理那个大批,那么下一个微批处理需要10小时的消息?并且逐渐地直到X迭代来追赶积压到更小的微批次。
发布于 2019-08-22 18:52:58
默认触发器是在哪一个基础上决定微批的大小?
事实并非如此。每个触发器(无论多么长)都只是请求输入数据集的所有源,以及它们所提供的所有源都是由操作符下游处理的。资源来源知道该给予什么,因为他们知道到目前为止已经消费(加工)了什么。
这就好像您询问了一个批处理结构化查询以及要处理的单个“触发器”请求的数据大小(BTW有ProcessingTime.Once触发器)。
这是否意味着,第一批微批次将是一个巨大的批次,1天的味精积累在卡夫卡的主题中,而工作却停止了?
几乎(而且实际上与星火结构化流没有多大关系)。
底层的Kafka使用者要处理的记录数量是由max.poll.records或者其他一些配置属性配置的(参见在一次民意测验中增加卡夫卡消费者阅读的信息数量)。
由于Spark结构化流使用了Kafka数据源,这只是Kafka的一个包装器,无论在单个微批中发生什么,都等同于这个单一的Consumer.poll调用。
您可以使用带有kafka.前缀的选项(例如,kafka.bootstrap.servers)来配置底层的卡夫卡消费者,这些选项在驱动程序和执行器上为卡夫卡消费者考虑。
https://stackoverflow.com/questions/57612213
复制相似问题