我有RDD[String],wordRDD。我还有一个函数,它从一个字符串/单词创建一个RDDString。我想为中的每个字符串创建一个新的RDD 。以下是我的尝试:
1)失败,因为星火不支持嵌套的RDD:
var newRDD = wordRDD.map( word => {
// execute myFunction()
(new MyClass(word)).myFunction()
})2)失败(可能是由于范围问题?):
var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}我的理想结果是:
// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)
// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')
// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)我在这里找到了一个相关的问题:Spark when union a lot of RDD throws stack overflow error,但它没有解决我的问题。
发布于 2015-09-10 22:30:02
根据您的意愿,使用flatMap获取RDD[String]。
var allWords = wordRDD.flatMap { word =>
(new MyClass(word)).myFunction().collect()
}发布于 2015-09-10 22:04:05
您不能在另一个RDD中创建RDD。
但是,可以将您的函数myFunction: String => RDD[String]重写为另一个函数modifiedFunction: String => Seq[String],它从一个字母被移除的输入中生成所有单词,这样就可以在RDD中使用它。这样,它也将在集群上并行执行。有了modifiedFunction,只需调用wordRDD.flatMap(modifiedFunction),就可以获得包含所有单词的最终RDD。
关键是使用flatMap (用于map和flatten转换):
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val input = sc.parallelize(Seq("apple", "ananas", "banana"))
// RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...)
val result = input.flatMap(modifiedFunction)
}
def modifiedFunction(word: String): Seq[String] = {
word.indices map {
index => word.substring(0, index) + word.substring(index+1)
}
}https://stackoverflow.com/questions/32512079
复制相似问题