首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark-Streaming挂起,kafka最早开始偏移(Kafka 2,spark 2.4.3)

Spark-Streaming挂起,kafka最早开始偏移(Kafka 2,spark 2.4.3)
EN

Stack Overflow用户
提问于 2019-09-18 05:35:49
回答 1查看 758关注 0票数 4

我对Spark-Streaming和Kafka有意见。在运行示例程序从Kafka主题消费并将微批结果输出到终端时,当我设置选项时,我的作业似乎挂起了:

df.option("startingOffsets", "earliest")

从最新的偏移量开始工作很好,结果随着每个微批次的流过而打印到终端。

我在想,也许这是一个资源问题--我正在尝试从一个有相当多数据的主题中阅读。但是,我似乎没有内存/cpu问题(使用本地*集群运行此作业)。这项工作似乎从来没有真正开始过,而只是悬而未决:

19/09/17 15:21:37 INFO Metadata: Cluster ID: JFXVL24JQ3K4CEbE-VA58A

代码语言:javascript
复制
  val sc = new SparkConf().setMaster("local[*]").setAppName("spark-test")
  val streamContext = new StreamingContext(sc, Seconds(1))
  val spark = SparkSession.builder().appName("spark-test")
    .getOrCreate()

  val topic = "topic.with.alotta.data"

  //subscribe tokafka
  val df = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "127.0.0.1:9092")
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .load()

 //write
 df.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .start()
    .awaitTermination()

我希望看到结果打印到console....but上,应用程序似乎就像我提到的那样挂起了。有什么想法吗?这感觉像是一个spark资源问题(因为我正在对一个有大量数据的主题运行一个本地“集群”。是不是有什么关于流式数据帧的本质是我遗漏的?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-09-19 10:31:41

写入控制台会导致每个触发器都会在驱动程序的内存中收集所有数据。由于您目前没有限制批处理的大小,这意味着整个主题内容将在驱动程序中累积。请参阅https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#output-sinks

设置批处理大小的限制应该可以解决您的问题。从卡夫卡读取时尝试添加maxOffsetsPerTrigger设置...

代码语言:javascript
复制
  val df = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "127.0.0.1:9092")
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1000)
    .load()

详情请参见https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57982370

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档