我想在spark中做单词计数,我创建了一个rdd,使用sql从数据集中提取不同的tweet。我想在RDD之上使用拆分函数,但它不允许我这样做。
错误:- valuse不是org.apache.spark.sql.SchemaRdd的成员
不起作用的星火密码:-
val disitnct_tweets=hiveCtx.sql("select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))
//tried split on both the rdd disnt worked
distinct_tweets.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
distinct_tweets_List.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)但是,当我将数据从sparksql输出到一个文件并再次加载并运行拆分时,它就能工作了。
工作的示例代码:-
val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val rdd=sc.parallelize(distinct_tweets_op)
rdd.saveAsTextFile("/home/cloudera/bdp/op")
val textFile=sc.textFile("/home/cloudera/bdp/op/part-00000")
val counts=textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.SaveAsTextFile("/home/cloudera/bdp/wordcount")我需要一个答案,而不是写文件和重新加载来完成我的拆分函数,是否有一个工作来使拆分函数工作。
谢谢
发布于 2015-05-23 17:06:26
找到答案,其三个步骤的过程,将数据帧或spark.sql.row.RDD转换成普通的RDD。
sc.parallelize(List())映射到字符串
val distinct_tweets=hiveCtx.sql(" select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val distinct_tweets_list=sc.parallelize(List(distinct_tweets_op))
val distinct_tweets_string=distinct_tweets.map(x=>x.toString)
val test_kali=distinct_tweets_string.flatMap(line =>line.split(" ")).map(word => (word,1)).reduceByKey(_+_).sortBy {case (key,value) => -value}.map { case (key,value) => Array(key,value).mkString(",") }
test_kali.collect().foreach(println)
case class kali_test(text: String)
val test_kali_op=test_kali.map(_.split(" ")).map(p => kali_test(p(0)))
test_kali_op.registerTempTable("kali_test")
hiveCtx.sql(" select * from kali_test limit 10 ").collect().foreach(println)这样,我不需要加载一个文件,我可以做我的操作在动态。
谢谢斯里
发布于 2017-05-18 06:45:33
首先,我们不应该进行收集(),然后并行化来创建RDD;这将使驱动程序忙碌/下降。
相反,
val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.map(x => x.mkstring) 考虑到这一点,您只选择查询- distinct(text)中的单个列。
现在,distinct_tweets_op只是一个RDD。
因此,循环这个RDD;您很好地在该RDD中的每个字符串上应用拆分(“”)函数。
发布于 2015-05-23 19:28:23
你第一次失败的主要原因是这一行:
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))在星火中,这是一条完全无用的线路,比无用更糟糕--正如你所看到的那样,它会让你的系统变成坦克。
您希望避免执行collect(),后者创建一个Array并将其返回给驱动程序应用程序。相反,您希望尽可能长时间地将对象保留为RDDs,并尽可能少地将数据返回给驱动程序(例如键和还原后的计数)。
但要回答您的基本问题,下面将使用一个由单个DataFrame列组成的StringType,并将其转换为RDDString
val myRdd = myDf.rdd.map(_.getString(0))虽然SchemaRDDs已不复存在,但我相信下面的内容将将具有单个字符串列的SchemaRDD转换为普通的RDDString
val myRdd = mySchemaRdd.map(_.getString(0))https://stackoverflow.com/questions/30412496
复制相似问题