我正在使用spark和scala从本地机器读取一个CSV文件,并将其存储到dataframe (称为df)中。我必须只从df中选择几个具有新别名名称的选定列,然后保存到新的newDf中。我也试过这样做,但我得到了下面的错误。
main" org.apache.spark.sql.AnalysisException: cannot resolve '`history_temp.time`' given input columns: [history_temp.time, history_temp.poc]下面是从本地机器读取csv文件所编写的代码。
import org.apache.spark.sql.SparkSession
object DataLoadConversion {
def main(args: Array[String]): Unit = {
System.setProperty("spark.sql.warehouse.dir", "file:///C:/spark-warehouse")
val spark = SparkSession.builder().master("local").appName("DataConversion").getOrCreate()
val df = spark.read.format("com.databricks.spark.csv")
.option("quote", "\"")
.option("escape", "\"")
.option("delimiter", ",")
.option("header", "true")
.option("mode", "FAILFAST")
.option("inferSchema","true")
.load("file:///C:/Users/an/Desktop/ct_temp.csv")
df.show(5) // Till this code is working fine
val newDf = df.select("history_temp.time","history_temp.poc")下面是我试过但不起作用的代码。
// val newDf = df.select($"history_temp.time",$"history_temp.poc")
// val newDf = df.select("history_temp.time","history_temp.poc")
// val newDf = df.select( df("history_temp.time").as("TIME"))
// val newDf = df.select(df.col("history_temp.time"))
// df.select(df.col("*")) // This is working
newDf.show(10)
}
}发布于 2019-07-08 20:21:47
从它的外观来看。您的列名格式是这里的问题。我猜它们只是普通的stringType,但是当您有类似history_temp.time的东西时,just会把它看作是一个排列的列。事实并非如此。我会重命名所有的列并替换“。到"“。然后,您可以运行相同的选择,它应该可以工作。你可以用折叠式来重新放置所有的“。下面写着"“。
val replacedDF = df.columns.foldleft(df){ (newdf, colname)=>
newdf.withColumnRenamed (colname, colname.replace(".","_"))
}完成后,您可以在下面的replacedDF中进行选择
val newDf= replacedDf.select("history_temp_time","history_temp_poc")告诉我它是如何为你工作的。
https://stackoverflow.com/questions/56940119
复制相似问题