我对Spark-Streaming和Kafka有意见。在运行示例程序从Kafka主题消费并将微批结果输出到终端时,当我设置选项时,我的作业似乎挂起了:
df.option("startingOffsets", "earliest")
从最新的偏移量开始工作很好,结果随着每个微批次的流过而打印到终端。
我在想,也许这是一个资源问题--我正在尝试从一个有相当多数据的主题中阅读。但是,我似乎没有内存/cpu问题(使用本地*集群运行此作业)。这项工作似乎从来没有真正开始过,而只是悬而未决:
19/09/17 15:21:37 INFO Metadata: Cluster ID: JFXVL24JQ3K4CEbE-VA58A
val sc = new SparkConf().setMaster("local[*]").setAppName("spark-test")
val streamContext = new StreamingContext(sc, Seconds(1))
val spark = SparkSession.builder().appName("spark-test")
.getOrCreate()
val topic = "topic.with.alotta.data"
//subscribe tokafka
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
//write
df.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()我希望看到结果打印到console....but上,应用程序似乎就像我提到的那样挂起了。有什么想法吗?这感觉像是一个spark资源问题(因为我正在对一个有大量数据的主题运行一个本地“集群”。是不是有什么关于流式数据帧的本质是我遗漏的?
发布于 2019-09-19 10:31:41
写入控制台会导致每个触发器都会在驱动程序的内存中收集所有数据。由于您目前没有限制批处理的大小,这意味着整个主题内容将在驱动程序中累积。请参阅https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#output-sinks
设置批处理大小的限制应该可以解决您的问题。从卡夫卡读取时尝试添加maxOffsetsPerTrigger设置...
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1000)
.load()详情请参见https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html。
https://stackoverflow.com/questions/57982370
复制相似问题