我有一个火花数据集,在这里,我已经根据我的需要分组和减少了我的数据。我需要去掉元组,只保留Tuple2::_2。我试图按如下方式映射数据集:
sparkSession.read()
.parquet("s3://stuff/*")
.groupByKey((MapFunction<Row, Long>) value -> {
long stamp = value.getAs("timeStamp");
return stamp / 600000;
}, Encoders.LONG())
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
int fare1 = v1.getAs("totalFare");
int fare2 = v2.getAs("totalFare");
return fare1 < fare2 ? v1 : v2;
})
.map((MapFunction<Tuple2<Long, Row>, Row>) Tuple2::_2, RowEncoder.apply(null))无法确定如何将架构提供给RowEn编码器::apply。我正在用这模式读取一个拼花文件。
发布于 2017-06-04 17:19:07
所以我就这样做了。基本上读取一个元素以获得所需的"ExpressionEncoder“。我需要最后输出中的完整“行”,所以不能继续使用@Jacek的方法。
System.out.println("Starting");
System.out.println(Arrays.toString(args));
Row sampleRow = sparkSession.read().parquet(readFrom).head();
ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(sampleRow.schema());
//read all elements, process and write back the result
sparkSession.read()
.parquet(readFrom)
.groupByKey((MapFunction<Row, Long>) value -> {
long stamp = value.getAs("timeStamp");
return stamp / 600000;
}, Encoders.LONG())
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
int fare1 = v1.getAs("totalFare");
int fare2 = v2.getAs("totalFare");
return fare1 < fare2 ? v1 : v2;
})
.map((MapFunction<Tuple2<Long, Row>, Row>) Tuple2::_2, rowEncoder)
.write()
.parquet(writeTo);
System.out.println("Done !!!!");发布于 2017-06-02 06:35:42
我没有用Java和Spark,所以我不能说得更具体,但是.
如果我没有弄错,您只想使用timeStamp和totalFare字段。timeStamp是长型的,totalFare是int型的。
我的第一个建议是使用Row操作符将未键入的Dataset[Long, Int]留给Dataset[Long, Int](在Scala中):
公共数据集as(编码器证据$2)返回一个新的数据集,其中每个记录都映射到指定的类型。
这样,您将避免处理这个不愉快的Row对象,您的转换如下所示:
sparkSession.read()
.parquet("s3://stuff/*")
.as(Encoder...) // <-- I don't know how to write a tuple of (long, int) in Java这样做之后,如果我没有弄错(试图将我的Scala思想映射到Encoders.INT() ),您关于Encoders.INT()的问题就会被“映射”到使用Encoders.INT()。
我提倡使用as操作符的原因是,在我看来,使用groupByKey和reduceGroups是一种非常强烈的愿望,希望将未类型化的RelationalGroupedDataset API留给类型化KeyValueGroupedDataset。
https://stackoverflow.com/questions/44309393
复制相似问题