首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在星火-jdbc连接中操作numPartitions、lowerBound、upperBound?

如何在星火-jdbc连接中操作numPartitions、lowerBound、upperBound?
EN

Stack Overflow用户
提问于 2018-07-25 10:37:36
回答 2查看 8.9K关注 0票数 2

我正在尝试使用spark读取postgres db上的一个表。为此,我提出了以下代码:

代码语言:javascript
复制
object PartitionRetrieval {
  var conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
  val log   = LogManager.getLogger("Spark-JDBC Program")
  Logger.getLogger("org").setLevel(Level.ERROR)
  val conFile       = "/home/myuser/ReconTest/inputdir/testconnection.properties"
  val properties    = new Properties()
  properties.load(new FileInputStream(conFile))
  val connectionUrl = properties.getProperty("gpDevUrl")
  val devUserName   = properties.getProperty("devUserName")
  val devPassword   = properties.getProperty("devPassword")
  val driverClass   = properties.getProperty("gpDriverClass")
  val tableName     = "base.ledgers"
  try {
    Class.forName(driverClass).newInstance()
  } catch {
    case cnf: ClassNotFoundException =>
      log.error("Driver class: " + driverClass + " not found")
      System.exit(1)
    case e: Exception =>
      log.error("Exception: " + e.printStackTrace())
      System.exit(1)
  }
  def main(args: Array[String]): Unit = {
    val spark   = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
    import spark.implicits._
    val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
    val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE" && gpTable("period_year")==="2017").count()
    println("gpTable Count: " + rc)
  }
}

现在,我正在获取行数,以查看连接是否成功或失败。它是一个巨大的表,获得计数要慢一些,我理解这一点,因为没有给出分区号和列名的参数来进行数据分区。

在很多地方,我看到jdbc对象是以以下方式创建的:

代码语言:javascript
复制
val gpTable2 = spark.read.jdbc(connectionUrl, tableName, connectionProperties) 

我用options以另一种格式创建了它。我无法理解当jdbc连接使用‘numPartitions’:val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()形成时,如何给出数据分区的分区列名。

有人能让我知道吗

  1. 如何将参数:numPartitions, lowerBound, upperBound添加到以这种方式编写的jdbc对象中: val gpTable = spark.read.format("jdbc").option("url",connectionUrl).option("dbtable",tableName).option("user",devUserName).option(“密码”,devPassword).load()
  2. 如何只添加columnnamenumPartition,因为我想获取2017年的所有行,并且不希望选择一系列行(lowerBound,upperBound)
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-07-25 14:11:37

选项numPartitions, lowerBound, upperBound and PartitionColumn控制火花中的并行读取。PartitionColumn需要一个完整的列。如果表中没有合适的列,则可以使用ROW_NUMBER作为分区列。

试试看,

代码语言:javascript
复制
val rowCount = spark.read.format("jdbc").option("url", connectionUrl)
                                       .option("dbtable","(select count(*) AS count * from tableName where source_system_name = "ORACLE" AND "period_year = "2017")")
                                       .option("user",devUserName)
                                       .option("password",devPassword)
                                       .load()
                                       .collect()
                                       .map(row => row.getAs[Int]("count")).head

我们获得了为提供的谓词返回的行数,该谓词可以用作upperBount。

代码语言:javascript
复制
val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
                                   .option("dbtable","(select ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS RNO, * from tableName source_system_name = "ORACLE" AND "period_year = "2017")")
                                   .option("user",devUserName)
                                   .option("password",devPassword)
                                   .option("numPartitions", 10)
                                   .option("partitionColumn", "RNO")
                                   .option("lowerBound", 1)
                                   .option("upperBound", rowCount)
                                   .load()

numPartitions取决于与Postgres的并行连接的数量。您可以根据从DB读取时所需的并行化来调整这一点。

票数 11
EN

Stack Overflow用户

发布于 2018-07-25 13:02:30

要处理像这样的查询,依赖于星火聚合是没有意义的。

更好的做法是将工作委托给数据库:

代码语言:javascript
复制
val sourceSystemName = "ORACLE"

val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
   .option("dbtable",
     s"(SELECT COUNT(*) FROM $tableName WHERE source_system_name = '$sourceSystemName') AS t")
   .option("user",devUserName)
   .option("password",devPassword).load()

不需要额外的配置,数据处理的效率也尽可能高,就在它所在的地方。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51516822

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档