首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何创建RowEncoder来映射Tuple<A,Row>到Row?

如何创建RowEncoder来映射Tuple<A,Row>到Row?
EN

Stack Overflow用户
提问于 2017-06-01 13:49:30
回答 2查看 1.4K关注 0票数 1

我有一个火花数据集,在这里,我已经根据我的需要分组和减少了我的数据。我需要去掉元组,只保留Tuple2::_2。我试图按如下方式映射数据集:

代码语言:javascript
复制
sparkSession.read()
            .parquet("s3://stuff/*")
            .groupByKey((MapFunction<Row, Long>) value -> {
                long stamp = value.getAs("timeStamp");
                return stamp / 600000;
            }, Encoders.LONG())
            .reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
                int fare1 = v1.getAs("totalFare");
                int fare2 = v2.getAs("totalFare");
                return fare1 < fare2 ? v1 : v2;
            })
            .map((MapFunction<Tuple2<Long, Row>, Row>) Tuple2::_2, RowEncoder.apply(null))

无法确定如何将架构提供给RowEn编码器::apply。我正在用模式读取一个拼花文件。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-06-04 17:19:07

所以我就这样做了。基本上读取一个元素以获得所需的"ExpressionEncoder“。我需要最后输出中的完整“行”,所以不能继续使用@Jacek的方法。

代码语言:javascript
复制
System.out.println("Starting");
System.out.println(Arrays.toString(args));
Row sampleRow = sparkSession.read().parquet(readFrom).head();
ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(sampleRow.schema());

//read all elements, process and write back the result
sparkSession.read()
            .parquet(readFrom)
            .groupByKey((MapFunction<Row, Long>) value -> {
                long stamp = value.getAs("timeStamp");
                return stamp / 600000;
            }, Encoders.LONG())
            .reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
                int fare1 = v1.getAs("totalFare");
                int fare2 = v2.getAs("totalFare");
                return fare1 < fare2 ? v1 : v2;
            })
            .map((MapFunction<Tuple2<Long, Row>, Row>) Tuple2::_2, rowEncoder)
            .write()
            .parquet(writeTo);
System.out.println("Done !!!!");
票数 2
EN

Stack Overflow用户

发布于 2017-06-02 06:35:42

我没有用Java和Spark,所以我不能说得更具体,但是.

如果我没有弄错,您只想使用timeStamptotalFare字段。timeStamp是长型的,totalFare是int型的。

我的第一个建议是使用Row操作符将未键入的Dataset[Long, Int]留给Dataset[Long, Int](在Scala中):

公共数据集as(编码器证据$2)返回一个新的数据集,其中每个记录都映射到指定的类型。

这样,您将避免处理这个不愉快的Row对象,您的转换如下所示:

代码语言:javascript
复制
sparkSession.read()
            .parquet("s3://stuff/*")
            .as(Encoder...)  // <-- I don't know how to write a tuple of (long, int) in Java

这样做之后,如果我没有弄错(试图将我的Scala思想映射到Encoders.INT() ),您关于Encoders.INT()的问题就会被“映射”到使用Encoders.INT()

我提倡使用as操作符的原因是,在我看来,使用groupByKeyreduceGroups是一种非常强烈的愿望,希望将未类型化的RelationalGroupedDataset API留给类型化KeyValueGroupedDataset

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44309393

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档