首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Schema文件定义

Schema文件定义
EN

Stack Overflow用户
提问于 2017-04-13 14:46:51
回答 1查看 2K关注 0票数 0

我有一个简单的火花应用程序,目的是读取分隔文本文件,并将它们保存为拼花格式。

所需的是处理一个平面数据文件(没有标头),该文件将附带一个模式定义。最终结果是一个可执行的jar,它将这些jar作为命令行参数传递。

到目前为止,我已经看过的示例要么是从标题行推断模式,要么是在代码本身中定义模式。如何才能做到这一点?

代码语言:javascript
复制
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class SparkCSVApplication {

public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
    // create Spark Context
    SparkContext context = new SparkContext(conf);
    // create spark Session
    SparkSession sparkSession = new SparkSession(context);

    Dataset<Row> df = sparkSession
            .read()
            .format("com.databricks.spark.csv")
            .option("header", true)
            .option("inferSchema", true)
            .load("/Users/Chris/Desktop/Meter_Geocode_Data_150215_114551.csv"); //TODO: CMD line arg
                        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

    System.out.println("========== Print Schema ============");
    df.printSchema();
    System.out.println("========== Print Data ==============");
    df.show();
    System.out.println("========== Generate parquet file ==============");
    df.write().parquet("/Users/Chris/Desktop/meter_geocode.parquet");

}

}

EN

回答 1

Stack Overflow用户

发布于 2017-04-14 21:59:56

inferSchema没有从标头中找出数据类型,row.It从数据本身找出数据类型,这里是文档中的文本-

inferSchema -从数据自动推断输入模式。它需要对数据进行一次额外的传递。如果无设置,则使用默认值false。

编辑:

要将在其他文件中定义的架构与现有的dataframe相关联,有几种方法可以编程完成。

假设main.csv中有数据,第二个文件中的所有标题都叫做header.csv,其中只包含逗号分隔的列名列表。做下面的事-

代码语言:javascript
复制
# read main data file, 
df = spark.read.csv("main.csv",header=False,inferSchema=True)

# read the file where headers are stored as string
hrdd = sc.textFile("header.csv")    
# make a list
newColumns = hrdd.collect()[0].split(",")

# Method # 1 : renaming all columns one by one 

# first get old column names
oldColumns = df.columns

if len(oldColumns) == len(newColumns):
    for i,newCol in enumerate(newColumns):
        df = df.withColumnRenamed(oldColumns[i],newCol)


or
# Method # 2 : just create a new dataframe by passing schema which was derived from reading 2nd file.

df = spark.createDataFrame(df.rdd,schema=newColumns)

免责声明:这是用pyspark编写的,我相信它在java中也很简单。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43395380

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档