我正在以火花数据集的形式加载一个拼花文件。我可以从查询中查询和创建新的数据集。现在,我想向dataset ("hashkey")添加一个新列并生成值(例如md5sum(nameValue))。我怎样才能做到这一点?
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Hello Spark");
sparkConf.setMaster("local");
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example")
.config("spark.master", "local").config("spark.sql.warehouse.dir", "file:///C:\\spark_warehouse")
.getOrCreate();
Dataset<org.apache.spark.sql.Row> df = spark.read().parquet("meetup.parquet");
df.show();
df.createOrReplaceTempView("tmpview");
Dataset<Row> namesDF = spark.sql("SELECT * FROM tmpview where name like 'Spark-%'");
namesDF.show();
}输出如下所示:
+-------------+-----------+-----+---------+--------------------+
| name|meetup_date|going|organizer| topics|
+-------------+-----------+-----+---------+--------------------+
| Spark-H20| 2016-01-01| 50|airisdata|[h2o, repeated sh...|
| Spark-Avro| 2016-01-02| 60|airisdata| [avro, usecases]|
|Spark-Parquet| 2016-01-03| 70|airisdata| [parquet, usecases]|
+-------------+-----------+-----+---------+--------------------+发布于 2017-04-10 12:40:45
只需在查询中为MD5添加spark函数即可。
Dataset<Row> namesDF = spark.sql("SELECT *, md5(name) as modified_name FROM tmpview where name like 'Spark-%'");发布于 2018-02-08 13:01:33
Dataset<Row> ds = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.option("delimiter","|")
.load("/home/cloudera/Desktop/data.csv");
ds.printSchema();将打印如下:
root
|-- ReferenceValueSet_Id: integer (nullable = true)
|-- ReferenceValueSet_Name: string (nullable = true)
|-- Code_Description: string (nullable = true)
|-- Code_Type: string (nullable = true)
|-- Code: string (nullable = true)
|-- CURR_FLAG: string (nullable = true)
|-- REC_CREATE_DATE: timestamp (nullable = true)
|-- REC_UPDATE_DATE: timestamp (nullable = true)
Dataset<Row> df1 = ds.withColumn("Key", functions.lit(1));
df1.printSchema();在添加上述代码后,它将添加一个具有常量值的列。
root
|-- ReferenceValueSet_Id: integer (nullable = true)
|-- ReferenceValueSet_Name: string (nullable = true)
|-- Code_Description: string (nullable = true)
|-- Code_Type: string (nullable = true)
|-- Code: string (nullable = true)
|-- CURR_FLAG: string (nullable = true)
|-- REC_CREATE_DATE: timestamp (nullable = true)
|-- REC_UPDATE_DATE: timestamp (nullable = true)
|-- Key: integer (nullable = true)可以看到将具有名称键的列添加到dataset中。
如果您想要添加某些列替代常数值,可以使用下面的代码来添加它。
Dataset<Row> df1 = ds.withColumn("Key", functions.lit(ds.col("Code")));
df1.printSchema();
df1.show();现在,它将把watever的值打印到列代码中。进入名为Key的新列中。
https://stackoverflow.com/questions/43323158
复制相似问题