首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将卡桑德拉表格数据的顺序数字处理转换为星火中的并行处理?

如何将卡桑德拉表格数据的顺序数字处理转换为星火中的并行处理?
EN

Stack Overflow用户
提问于 2019-03-27 17:47:32
回答 1查看 127关注 0票数 2

我们正在使用火花卡桑德拉连接器对来自Cassandra表的数据进行一些数学建模,并且执行目前是连续的,以获得输出。如何将其并行化以加快执行速度?

我对Spark还不熟悉,我尝试了一些东西,但我无法理解如何在map、groupby、还原函数中使用表格数据。如果有人能帮助解释(用一些代码片段)如何对表格数据进行parrellize,这将是非常有帮助的。

代码语言:javascript
复制
import org.apache.spark.sql.{Row, SparkSession}
import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf



class SparkExample(sparkSession: SparkSession, pathToCsv: String) {
  private val sparkContext = sparkSession.sparkContext
  sparkSession.stop()
  val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host","127.0.0.1")                           
  .setAppName("cassandra").setMaster("local[*]")
  val sc = new SparkContext(conf)


 def testExample(): Unit = {




val KNMI_rdd = sc.cassandraTable ("dbks1","knmi_w")


val Table_count = KNMI_rdd.count()
val KNMI_idx = KNMI_rdd.zipWithIndex
val idx_key = KNMI_idx.map{case (k,v) => (v,k)}

var i = 0
var n : Int = Table_count.toInt


println(Table_count)

for ( i  <- 1 to n if i < n) {
  println(i)


  val Row = idx_key.lookup(i)

  println(Row)


  val firstRow = Row(0)



  val yyyy_var = firstRow.get[Int]("yyyy")
  val mm_var = firstRow.get[Double]("mm")
  val dd_var = firstRow.get[Double]("dd")
  val dr_var = firstRow.get[Double]("dr")
  val tg_var = firstRow.get[Double]("tg")
  val ug_var = firstRow.get[Double]("ug")
  val loc_var = firstRow.get[String]("loc")



  val pred_factor = (((0.15461 * tg_var) + (0.8954 * ug_var)) / ((0.0000451 * dr_var) + 0.0004487))




  println(yyyy_var,mm_var,dd_var,loc_var)
  println(pred_factor)

 }

 }
}

  //test data

// loc | yyyy | mm | dd | dr  | tg  | ug
//-----+------+----+----+-----+-----+----
// AMS | 2019 |  1 |  1 |  35 |   5 | 84
// AMS | 2019 |  1 |  2 |  76 |  34 | 74
// AMS | 2019 |  1 |  3 |  46 |  33 | 85
// AMS | 2019 |  1 |  4 |  35 |   1 | 84
// AMS | 2019 |  1 |  5 |  29 |   0 | 93
// AMS | 2019 |  1 |  6 |  32 |  25 | 89
// AMS | 2019 |  1 |  7 |  42 |  23 | 89
// AMS | 2019 |  1 |  8 |  68 |  75 | 92
// AMS | 2019 |  1 |  9 |  98 |  42 | 86
// AMS | 2019 |  1 | 10 |  92 |  12 | 76
// AMS | 2019 |  1 | 11 |  66 |   0 | 71
// AMS | 2019 |  1 | 12 |  90 |  56 | 85
// AMS | 2019 |  1 | 13 |  83 | 139 | 90

编辑1:我厌倦了使用地图函数,我能够计算出数学计算,我如何在这些由WeatherId定义的值前面添加键呢?

代码语言:javascript
复制
            case class Weather( loc: String, yyyy: Int, mm: Int, dd: Int,dr: Double, tg: Double, ug: Double)
            case class WeatherId(loc: String, yyyy: Int, mm: Int, dd: Int)

                   val rows = dataset1
                                        .map(line => Weather(
                                              line.getAs[String]("loc"),
                                              line.getAs[Int]("yyyy"),
                                              line.getAs[Int]("mm"),
                                              line.getAs[Int]("dd"),
                                              line.getAs[Double]("dr"),
                                              line.getAs[Double]("tg"),
                                              line.getAs[Double]("ug")
                                                            ) )


                  val pred_factor   = rows
                                        .map(x => (( ((x.dr * betaz) + (x.tg * betay)) + (x.ug) * betaz)))

谢谢

EN

回答 1

Stack Overflow用户

发布于 2019-03-28 13:45:35

TL;DR;

使用Dataframe/Dataset而不是RDD。

DFs相对于RDDs的争论是很长的,但缺少的是DFs和它们的结构化替代DS的性能优于低级别RDDs。

通过使用火花-卡桑德拉连接器,您可以使用配置输入拆分大小来规定spark中分区大小的大小,更多的分区更具有并行性。

代码语言:javascript
复制
val lastdf = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map(
    "table" -> "words",
    "keyspace" -> "test" ,
    "cluster" -> "ClusterOne",
    "spark.cassandra.input.split.size_in_mb" -> 48 // smaller size = more partitions
    )
  ).load()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55383576

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档