我正在尝试使用spark streaming从HDFS读取数据。下面是我的代码。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.hadoop.fs._
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(10))
val directory ="hdfs://pc-XXXX:9000/hdfs/watchdirectory/"
val lines=ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (t:Path) => true, true).map(_._2.toString)
lines.count()
lines.print()
ssc.start
ssc.awaitTermination()代码会运行,但不会从HDFS读取任何数据。每隔10秒,我就会得到一个空行。
我已经浏览了fileStream的文档,并且我知道我已经将文件移到了监视目录。但它对我不起作用。我也尝试过textFileStream,但没有成功。
我使用的是使用Scala 2.11.8构建的spark 2.0.0
有什么建议请提出来。
发布于 2016-10-04 06:31:25
请在下面尝试
val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines= ssc.textFileStream("hdfs://pc-XXXX:9000/hdfs/watchdirectory/").map(_._2.toString)
lines.count()
lines.print()
ssc.start
ssc.awaitTermination()执行此操作后,将文件移动到
/hdfs/watchdirectory/https://stackoverflow.com/questions/39839009
复制相似问题