我正在尝试并行地从MongoDB获取数据,并将所有数据存储在一个集合中,查看名称,以便我可以将它们引用回来。
为此,我创建了一个集合,在这里我试图存储数据格式和视图。我正在将错误元素附加到集合中。我试过用向量,列表,塞克。但似乎什么都不适合我。
有没有办法处理这些问题?
var mongoFrames = Nil
for(c <- collections) {
var connectionString = connectionInt.setCollection(c);
var dframe = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", connectionString).load()
var view = dframe.createOrReplaceTempView(c);
var mongoQuery = s"select * from $c where tuid in (${tuidIn.mkString(",")})";
var tup = (c, dframe, view, mongoQuery)
mongoFrames += tup
}
for(v <- mongoFrames) yield spark.sql(v._4).collect() // load data from source into spark更新
当尝试使用+:时,我会收到以下错误
错误:值+:不是(String,org.apache.spark.sql.DataFrame,Unit,String) mongoFrames +:mongoFrames
的成员
发布于 2021-08-10 13:49:33
你可以把它写成:
var mongoFrames: Seq[Tuple3[String, DataFrame,String]] = Seq.empty和
var tup: Tuple[String, DataFrame, String] = (c, dframe, mongoQuery)
mongoFrames = mongoFrames :+ tup然后
迭代它
for(v <- mongoFrames) yield spark.sql(v._3).collect() 编辑1:
在本例中,迭代集合的一种更惯用的方法是编写:
mongoFrames.foreach(spark.sql(_._3).collect())使用匿名函数。
这是以下简称:
mongoFrames.foreach(mongoFrame => spark.sql(mongoFrame._3).collect())发布于 2021-08-10 13:30:46
这应该适用于你:
var mongoFrames = List.empty[(String, DataFrame, Unit, String)]
for(c <- collections) {
//...
mongoFrames = mongoFrames:+ tup
}不要在元组中添加createOrReplaceTempView变量,因为方法返回单元没有用。您可以在SparkSession中使用具有其名称的访问权限。
https://stackoverflow.com/questions/68727139
复制相似问题