我有一个简单的火花应用程序,目的是读取分隔文本文件,并将它们保存为拼花格式。
所需的是处理一个平面数据文件(没有标头),该文件将附带一个模式定义。最终结果是一个可执行的jar,它将这些jar作为命令行参数传递。
到目前为止,我已经看过的示例要么是从标题行推断模式,要么是在代码本身中定义模式。如何才能做到这一点?
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkCSVApplication {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", true)
.option("inferSchema", true)
.load("/Users/Chris/Desktop/Meter_Geocode_Data_150215_114551.csv"); //TODO: CMD line arg
//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Generate parquet file ==============");
df.write().parquet("/Users/Chris/Desktop/meter_geocode.parquet");
}}
发布于 2017-04-14 21:59:56
inferSchema没有从标头中找出数据类型,row.It从数据本身找出数据类型,这里是文档中的文本-
inferSchema -从数据自动推断输入模式。它需要对数据进行一次额外的传递。如果无设置,则使用默认值false。
编辑:
要将在其他文件中定义的架构与现有的dataframe相关联,有几种方法可以编程完成。
假设main.csv中有数据,第二个文件中的所有标题都叫做header.csv,其中只包含逗号分隔的列名列表。做下面的事-
# read main data file,
df = spark.read.csv("main.csv",header=False,inferSchema=True)
# read the file where headers are stored as string
hrdd = sc.textFile("header.csv")
# make a list
newColumns = hrdd.collect()[0].split(",")
# Method # 1 : renaming all columns one by one
# first get old column names
oldColumns = df.columns
if len(oldColumns) == len(newColumns):
for i,newCol in enumerate(newColumns):
df = df.withColumnRenamed(oldColumns[i],newCol)
or
# Method # 2 : just create a new dataframe by passing schema which was derived from reading 2nd file.
df = spark.createDataFrame(df.rdd,schema=newColumns)免责声明:这是用pyspark编写的,我相信它在java中也很简单。
https://stackoverflow.com/questions/43395380
复制相似问题