首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark Sql映射问题

Spark Sql映射问题
EN

Stack Overflow用户
提问于 2017-01-06 00:03:05
回答 2查看 1.1K关注 0票数 3

sparks 2/Java8 8 Cassandra2试图读取Cassandra中的一些数据,然后在sparks中按查询运行组。在DF传输日期(日期)、原点(字符串)中只有2列。

代码语言:javascript
复制
Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins  GROUP BY (origin,transdate) ORDER BY cnt DESC LIMIT 1"); `

获取错误:

代码语言:javascript
复制
 `Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'origins.`origin`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value)`

已解决按问题分组删除()的问题如下

完整代码:(试图获得最大的传输日期的原点/位置)

代码语言:javascript
复制
JavaRDD<TransByDate> originDateRDD = javaFunctions(sc).cassandraTable("trans", "trans_by_date", CassandraJavaUtil.mapRowTo(TransByDate.class))
                    .select(CassandraJavaUtil.column("origin"), CassandraJavaUtil.column("trans_date").as("transdate")).limit((long)100) ;
Dataset<Row> originDF = sparks.createDataFrame(originDateRDD, TransByDate.class);
String[] columns = originDF.columns();
System.out.println("originDF columns: "+columns[0]+" "+columns[1]) ; -> transdate origin
originDF.createOrReplaceTempView("origins");

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins  GROUP BY origin,transdate ORDER BY cnt DESC LIMIT 1"); 
List list = maxOrigindate.collectAsList(); -> Exception here
int j = list.size();

originDF :传输原点

代码语言:javascript
复制
`public static class TransByDate implements Serializable {
        private String origin;
        private Date transdate;

        public TransByDate() { }

        public TransByDate (String origin, Date transdate) { 
            this.origin = origin;
            this.transdate= transdate;

        }

        public String getOrigin() { return origin; }
        public void setOrigin(String origin) { this.origin = origin; }

        public Date getTransdate() { return transdate; }
        public void setTransdate(Date transdate) { this.transdate = transdate; }

    }

模式

代码语言:javascript
复制
root
 |-- transdate: struct (nullable = true)
 |    |-- date: integer (nullable = false)
 |    |-- day: integer (nullable = false)
 |    |-- hours: integer (nullable = false)
 |    |-- minutes: integer (nullable = false)
 |    |-- month: integer (nullable = false)
 |    |-- seconds: integer (nullable = false)
 |    |-- time: long (nullable = false)
 |    |-- timezoneOffset: integer (nullable = false)
 |    |-- year: integer (nullable = false)
 |-- origin: string (nullable = true)

org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) .的异常错误执行器:任务0.0中的异常(TID 12) scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (类java.util.Date)。线程"main“org.apache.spark.SparkException中的异常:由于阶段失败而中止作业:第2.0阶段中的任务0失败1次,最近一次失败:在阶段2.0中丢失任务0.0 (TID 12,localhost):scala.MatchError: SunJan01 00:00:00 :00PST 2012 (java.util.Date类)司机追踪: org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) ..。在org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2184) at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559) at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2184) at spark.SparkTest.sqlMaxCount(SparkTest.java:244) -> List = maxOrigindate.collectAsList();

致因:scala.MatchError:Sun Jan 01 00:00:00 PST 2012 ( class java.util.Date) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-01-14 06:10:42

你的错误越来越少。

代码语言:javascript
复制
Caused by: scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at 

此错误是因为Spark支持java.sql.Date类型。请查看星火文档这里。您还可以参考火花-2562

票数 1
EN

Stack Overflow用户

发布于 2017-01-06 05:43:00

将查询更改为

代码语言:javascript
复制
Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, 
transdate, 
COUNT(*) AS cnt FROM origins  GROUP BY origin,transdate 
ORDER BY cnt DESC LIMIT 1"); 

这会管用的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41496851

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档