首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spark streaming无法使用spark sql

spark streaming无法使用spark sql
EN

Stack Overflow用户
提问于 2018-12-21 00:32:10
回答 1查看 79关注 0票数 0

我在spark streaming过程中遇到了一个问题。在它被流式传输并传递给"parse“方法后,我得到了空记录。

我的代码:

代码语言:javascript
复制
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
import org.apache.spark.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, 
IntegerType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, 
IntegerType}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import java.util.regex.Pattern
import java.util.regex.Matcher
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql._

val conf = new SparkConf().setAppName("streamHive").setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true")

val ssc = new StreamingContext(conf, Seconds(5))    

val sc=ssc.sparkContext

val lines = ssc.textFileStream("file:///home/sadr/testHive")

case class Prices(name: String, age: String,sex: String, location: String)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

def parse (rdd : org.apache.spark.rdd.RDD[String] ) = 
{
var l = rdd.map(_.split(","))
val prices = l.map(p => Prices(p(0),p(1),p(2),p(3)))
val pricesDf = sqlContext.createDataFrame(prices)
pricesDf.registerTempTable("prices")
pricesDf.show()
var x = sqlContext.sql("select count(*) from prices")
x.show()}
lines.foreachRDD { rdd => parse(rdd)} 
lines.print()
ssc.start()

我的输入文件:

代码语言:javascript
复制
cat test1.csv

Riaz,32,M,uk
tony,23,M,india
manu,33,M,china
imart,34,F,AUS

我得到了这个输出:

代码语言:javascript
复制
lines.foreachRDD { rdd => parse(rdd)}

lines.print()

ssc.start()

scala> +----+---+---+--------+
|name|age|sex|location|
+----+---+---+--------+
+----+---+---+--------+

我正在使用Spark版本2.3....在添加X.SHOW()之后,我得到以下错误:

EN

回答 1

Stack Overflow用户

发布于 2018-12-21 01:12:25

不确定您是否真的能够读取这些流。

textFileStream只读取程序启动后添加到目录中的新文件,而不读取现有文件。文件已经在那里了吗?如果是,将其从目录中删除,启动程序并再次复制该文件?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53872626

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档