首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Scala只读取文件的某些部分。

Scala只读取文件的某些部分。
EN

Stack Overflow用户
提问于 2016-12-16 04:02:49
回答 1查看 1.1K关注 0票数 1

我试图在Scala中读取一个我知道的结构的输入文件,但是我只需要每9项输入一次。到目前为止,我已经成功地阅读了整篇文章:

代码语言:javascript
复制
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(","))

问题是,这给我留下了一个数组,即巨型(我们正在谈论的是20 we的数据)。我不仅看到自己被迫编写一些非常难看的代码,以便在RDD[ArrayString]和ArrayString之间进行转换,而且它实际上使我的代码毫无用处。

我尝试过不同的方法和混合使用

代码语言:javascript
复制
.map()
.flatMap() and
.reduceByKey()

然而,实际上没有任何东西把我收集的“单元格”变成我需要它们的格式。

下面是应该发生的事情:从我们的服务器读取一个文本文件文件夹,代码应该以格式读取每一行文本:

代码语言:javascript
复制
*---------*
| NASDAQ: |
*---------*
exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close

并且只保留一个stock_symbol,因为这是我正在计数的标识符。到目前为止,我的尝试是将整个事件转换为一个数组,从第一个索引到第一个collected_cells变量,每9个索引收集一次。问题是,根据我的计算和现实生活的结果,代码运行需要335天(不是开玩笑)。

下面是我当前的代码供参考:

代码语言:javascript
复制
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkNum {


  def main(args: Array[String]) {

    // Do some Scala voodoo
    val sc = new SparkContext(new SparkConf().setAppName("Spark Numerical"))

    // Set input file as per HDFS structure + input args
    val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
    val fields = lines.map(line => line.split(","))
    var collected_cells:Array[String] = new Array[String](0)

    //println("[MESSAGE] Length of CC: " + collected_cells.length)

    val divider:Long = 9
    val array_length = fields.count / divider
    val casted_length = array_length.toInt

    val indexedFields = fields.zipWithIndex
    val indexKey = indexedFields.map{case (k,v) => (v,k)}

    println("[MESSAGE] Number of lines: " + array_length)
    println("[MESSAGE] Casted lenght of: " + casted_length)



    for( i <- 1 to casted_length ) {

      println("[URGENT DEBUG] Processin line " + i + " of " + casted_length)

      var index = 9 * i - 8

      println("[URGENT DEBUG] Index defined to be " + index)

      collected_cells :+ indexKey.lookup(index)

    }



    println("[MESSAGE] collected_cells size: " + collected_cells.length)



    val single_cells = collected_cells.flatMap(collected_cells => collected_cells);
    val counted_cells = single_cells.map(cell => (cell, 1).reduceByKey{case (x, y) => x + y})
    // val result = counted_cells.reduceByKey((a,b) => (a+b))

    // val inmem = counted_cells.persist()
    //
    // // Collect driver into file to be put into user archive
    // inmem.saveAsTextFile("path to server location")

    // ==> Not necessary to save the result as processing time is recorded, not output


  }

}

在我尝试调试它时,底部部分被注释掉了,但是它充当了伪代码,让我知道我需要做什么。我可能需要指出的是,我几乎不熟悉Scala,因此,诸如_ notation之类的东西使我的生活变得混乱。

耽误您时间,实在对不起。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-12-16 10:24:56

有一些概念需要在这个问题上加以澄清:

当我们执行此代码时:

代码语言:javascript
复制
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(",")) 

这并不会导致数据大小的巨大数组。该表达式表示基本数据的转换。它可以进一步转换,直到我们将数据简化为我们想要的信息集。

在本例中,我们希望使用编码为csv的记录的stock_symbol字段:

代码语言:javascript
复制
exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close

我还将假设数据文件包含如下横幅:

代码语言:javascript
复制
*---------*
| NASDAQ: |
*---------*

我们要做的第一件事就是移除任何看起来像这个横幅的东西。实际上,我将假设第一个字段是以字母数字字符开头的证券交易所的名称。在进行任何分裂之前,我们都会这样做,结果是:

代码语言:javascript
复制
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val validLines = lines.filter(line => !line.isEmpty && line.head.isLetter)
val fields = validLines.map(line => line.split(","))

它有助于编写变量的类型,让人安心,因为我们有我们期望的数据类型。随着我们在Scala技能方面的进步,这些技能可能会变得不那么重要。让我们用类型重写上面的表达式:

代码语言:javascript
复制
val lines: RDD[String] = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val validLines: RDD[String] = lines.filter(line => !line.isEmpty && line.head.isLetter)
val fields: RDD[Array[String]] = validLines.map(line => line.split(","))

我们对stock_symbol字段感兴趣,它在位置上是基于0的数组中的元素#1:

代码语言:javascript
复制
val stockSymbols:RDD[String] = fields.map(record => record(1))

如果我们想要计数符号,剩下的就是发出一个计数:

代码语言:javascript
复制
val totalSymbolCount = stockSymbols.count()

这不是很有帮助,因为我们对每一项记录都有一个条目。更有趣的问题是:

我们有多少种不同的股票符号?

代码语言:javascript
复制
val uniqueStockSymbols = stockSymbols.distinct.count()

每个符号有多少个记录?

代码语言:javascript
复制
val countBySymbol = stockSymbols.map(s => (s,1)).reduceByKey(_+_)

在Spark2.0中,CSV对Dataframes和数据集的支持是现成的,因为我们的数据没有带有字段名的标题行(在大型数据集中通常是这样),我们需要提供列名:

代码语言:javascript
复制
val stockDF = sparkSession.read.csv("/tmp/quotes_clean.csv").toDF("exchange", "symbol", "date", "open", "close", "volume", "price") 

我们现在可以很容易地回答我们的问题:

代码语言:javascript
复制
val uniqueSymbols =  stockDF.select("symbol").distinct().count
val recordsPerSymbol = stockDF.groupBy($"symbol").agg(count($"symbol"))
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41177097

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档