我用这种方式向卡夫卡写数据。
df.write()格式(“kafka”)保存()
我能控制卡夫卡的写作速度以避免对卡夫卡的压力吗?有什么办法可以帮助减慢速度吗?
发布于 2019-11-01 11:35:28
我认为将linger.ms设置为非零值会有帮助。因为它控制在发送当前批之前等待其他消息的时间。代码可以如下所示
df.write.format("kafka").option("linger.ms", "100").save()但这取决于很多事情。如果你的卡夫卡足够大,配置得当,我不会太担心速度。毕竟,卡夫卡就是为了应对这种情况而设计的(交通高峰)。
发布于 2019-11-04 09:56:28
一般情况下,结构化流将尝试在默认情况下尽可能快地处理数据。每个源中都有允许控制处理速率的选项,例如文件源中的maxFilesPerTrigger和卡夫卡源中的maxOffsetsPerTrigger。
val streamingETLQuery = cloudtrailEvents
.withColumn("date", $"timestamp".cast("date") // derive the date
.writeStream
.trigger(ProcessingTime("10 seconds")) // check for files every 10s
.format("parquet") // write as Parquet partitioned by date
.partitionBy("date")
.option("path", "/cloudtrail")
.option("checkpointLocation", "/cloudtrail.checkpoint/")
.start()
val df = spark.readStream
.format("text")
.option("maxFilesPerTrigger", 1)
.load("text-logs")有关更多详细信息,请阅读下列链接:
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-FileStreamSource.html https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
https://stackoverflow.com/questions/58656194
复制相似问题