首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从星火中的DataFrame中筛选和选择数据

从星火中的DataFrame中筛选和选择数据
EN

Stack Overflow用户
提问于 2018-07-24 09:31:13
回答 1查看 1.1K关注 0票数 1

我正在开发一个Spark程序,到目前为止,我想出了以下代码:

代码语言:javascript
复制
object PartitionRetrieval {
    var conf  = new SparkConf().setAppName("Spark-JDBC")
    val log   = LogManager.getLogger("Spark-JDBC Program")
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conFile       = "/home/hmusr/ReconTest/inputdir/testconnection.properties"
    val properties    = new Properties()
    properties.load(new FileInputStream(conFile))
    val connectionUrl = properties.getProperty("gpDevUrl")
    val devUserName   = properties.getProperty("devUserName")
    val devPassword   = properties.getProperty("devPassword")
    val driverClass   = properties.getProperty("gpDriverClass")
    val tableName     = "source.bank_accounts"
    try {
    Class.forName(driverClass).newInstance()
    } catch {
    case cnf: ClassNotFoundException =>
        log.error("Driver class: " + driverClass + " not found")
        System.exit(1)
    case e: Exception =>
        log.error("Exception: " + e.printStackTrace())
        System.exit(1)
    }
    def main(args: Array[String]): Unit = {
        val spark   = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
        val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
                                                        .option("dbtable",tableName)
                                                        .option("user",devUserName)
                                                        .option("password",devPassword).load()
        val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE").count()
        println("gpTable Count: " + rc)
    }
}

在上面的代码中,将语句:val gpTable = spark.read.format("jdbc").option("url", connectionUrl)table: bank_accounts的全部数据转储到DataFrame: gpTable中,然后DataFrame: rc获取过滤后的数据。作为表,我对此表示怀疑: bank_accounts是一个非常小的表,如果将它作为一个整体加载到内存中,它不会产生影响。但在我们的生产中,有数以十亿计的记录。在这种情况下,建议使用JDBC连接将数据加载到DataFrame中的方法是什么?有人能让我知道Spark的入口点的概念吗?

EN

回答 1

Stack Overflow用户

发布于 2018-07-24 11:08:39

  • 那份声明..。将表: bank_accounts的全部数据转储到DataFrame: gpTable中,然后DataFrame: rc获取过滤后的数据。

不是的。DataFrameReader并不急于。它只定义数据绑定。

此外,简单的谓词(比如简单的等式)将检查推送到源,并且在执行计划时只应加载所需的列。

在数据库日志中,您应该会看到类似于

从表中选择1,其中source_system_name = 'ORACLE‘

  • 如果它作为一个数据作为一个整体加载到内存中。

不是的。Spark不会在内存中加载数据,除非它指示(主要是cache),即使这样,它也会将自己限制在适合可用存储内存的块上。

在标准过程中,它只保留计算计划所需的数据。对于全局计划,内存占用不应依赖于数据量。

  • 在这种情况下,建议使用JDBC连接将数据加载到DataFrame中的方法是什么?

有关可伸缩性的问题,请查看Partitioning in spark while reading from RDBMS via JDBCWhats meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?https://stackoverflow.com/a/45028675/8371915

此外,您还可以阅读Does spark predicate pushdown work with JDBC?

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51495191

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档