我试图在Scala中读取一个我知道的结构的输入文件,但是我只需要每9项输入一次。到目前为止,我已经成功地阅读了整篇文章:
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(","))问题是,这给我留下了一个数组,即巨型(我们正在谈论的是20 we的数据)。我不仅看到自己被迫编写一些非常难看的代码,以便在RDD[ArrayString]和ArrayString之间进行转换,而且它实际上使我的代码毫无用处。
我尝试过不同的方法和混合使用
.map()
.flatMap() and
.reduceByKey()然而,实际上没有任何东西把我收集的“单元格”变成我需要它们的格式。
下面是应该发生的事情:从我们的服务器读取一个文本文件文件夹,代码应该以格式读取每一行文本:
*---------*
| NASDAQ: |
*---------*
exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close并且只保留一个stock_symbol,因为这是我正在计数的标识符。到目前为止,我的尝试是将整个事件转换为一个数组,从第一个索引到第一个collected_cells变量,每9个索引收集一次。问题是,根据我的计算和现实生活的结果,代码运行需要335天(不是开玩笑)。
下面是我当前的代码供参考:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkNum {
def main(args: Array[String]) {
// Do some Scala voodoo
val sc = new SparkContext(new SparkConf().setAppName("Spark Numerical"))
// Set input file as per HDFS structure + input args
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(","))
var collected_cells:Array[String] = new Array[String](0)
//println("[MESSAGE] Length of CC: " + collected_cells.length)
val divider:Long = 9
val array_length = fields.count / divider
val casted_length = array_length.toInt
val indexedFields = fields.zipWithIndex
val indexKey = indexedFields.map{case (k,v) => (v,k)}
println("[MESSAGE] Number of lines: " + array_length)
println("[MESSAGE] Casted lenght of: " + casted_length)
for( i <- 1 to casted_length ) {
println("[URGENT DEBUG] Processin line " + i + " of " + casted_length)
var index = 9 * i - 8
println("[URGENT DEBUG] Index defined to be " + index)
collected_cells :+ indexKey.lookup(index)
}
println("[MESSAGE] collected_cells size: " + collected_cells.length)
val single_cells = collected_cells.flatMap(collected_cells => collected_cells);
val counted_cells = single_cells.map(cell => (cell, 1).reduceByKey{case (x, y) => x + y})
// val result = counted_cells.reduceByKey((a,b) => (a+b))
// val inmem = counted_cells.persist()
//
// // Collect driver into file to be put into user archive
// inmem.saveAsTextFile("path to server location")
// ==> Not necessary to save the result as processing time is recorded, not output
}
}在我尝试调试它时,底部部分被注释掉了,但是它充当了伪代码,让我知道我需要做什么。我可能需要指出的是,我几乎不熟悉Scala,因此,诸如_ notation之类的东西使我的生活变得混乱。
耽误您时间,实在对不起。
发布于 2016-12-16 10:24:56
有一些概念需要在这个问题上加以澄清:
当我们执行此代码时:
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(",")) 这并不会导致数据大小的巨大数组。该表达式表示基本数据的转换。它可以进一步转换,直到我们将数据简化为我们想要的信息集。
在本例中,我们希望使用编码为csv的记录的stock_symbol字段:
exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close我还将假设数据文件包含如下横幅:
*---------*
| NASDAQ: |
*---------*我们要做的第一件事就是移除任何看起来像这个横幅的东西。实际上,我将假设第一个字段是以字母数字字符开头的证券交易所的名称。在进行任何分裂之前,我们都会这样做,结果是:
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val validLines = lines.filter(line => !line.isEmpty && line.head.isLetter)
val fields = validLines.map(line => line.split(","))它有助于编写变量的类型,让人安心,因为我们有我们期望的数据类型。随着我们在Scala技能方面的进步,这些技能可能会变得不那么重要。让我们用类型重写上面的表达式:
val lines: RDD[String] = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val validLines: RDD[String] = lines.filter(line => !line.isEmpty && line.head.isLetter)
val fields: RDD[Array[String]] = validLines.map(line => line.split(","))我们对stock_symbol字段感兴趣,它在位置上是基于0的数组中的元素#1:
val stockSymbols:RDD[String] = fields.map(record => record(1))如果我们想要计数符号,剩下的就是发出一个计数:
val totalSymbolCount = stockSymbols.count()这不是很有帮助,因为我们对每一项记录都有一个条目。更有趣的问题是:
我们有多少种不同的股票符号?
val uniqueStockSymbols = stockSymbols.distinct.count()每个符号有多少个记录?
val countBySymbol = stockSymbols.map(s => (s,1)).reduceByKey(_+_)在Spark2.0中,CSV对Dataframes和数据集的支持是现成的,因为我们的数据没有带有字段名的标题行(在大型数据集中通常是这样),我们需要提供列名:
val stockDF = sparkSession.read.csv("/tmp/quotes_clean.csv").toDF("exchange", "symbol", "date", "open", "close", "volume", "price") 我们现在可以很容易地回答我们的问题:
val uniqueSymbols = stockDF.select("symbol").distinct().count
val recordsPerSymbol = stockDF.groupBy($"symbol").agg(count($"symbol"))https://stackoverflow.com/questions/41177097
复制相似问题