首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将java Resultset转换为Spark数据

如何将java Resultset转换为Spark数据
EN

Stack Overflow用户
提问于 2017-08-04 17:43:22
回答 1查看 4.6K关注 0票数 1

我试图在JDBC中使用预置语句。其结果是ResultSet对象。我想把它转换成火花数据格式。

代码语言:javascript
复制
object JDBCRead {

val tableName:String = "TABLENAME"
val url :String = "jdbc:teradata://TERADATA_URL/user=USERNAME,password=PWD,charset=UTF8,TYPE=FASTEXPORT,SESSIONS=10"
val  selectTable:String  = "SELECT * FROM " + tableName +" sample 10";

 val con : Connection = DriverManager.getConnection(url);


 val pstmt2: PreparedStatement = con.prepareStatement(selectTable)

import java.sql.ResultSet

val rs: ResultSet = pstmt2.executeQuery



val rsmd: ResultSetMetaData = rs.getMetaData
while(rs.next()!=null)
{
  val k: Boolean = rs.next()
  for(i<-1 to rsmd.getColumnCount) {
    print(" " + rs.getObject(i))
  }
  println()
}

}

我想从Spark调用上面的代码,这样我就可以将数据加载到Dataframe中,并以更快的速度获得结果。

我必须使用PreparedStatement。我不能使用spark.jdbc.load,因为Teradata的FASTEXPORT不适用于jdbc。它必须与PreparedStatement一起使用

如何做到这一点?如何将用户预置语句与SELECT语句一起加载到Spark中。

EN

回答 1

Stack Overflow用户

发布于 2017-08-04 20:07:56

-

对于这类需求,有两个选项可用: 1. DataFrame 2. JdbcRDD

I将提供 https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala (因为您对预付费语句如此特殊)

在prepareStatement内部使用compute方法。因此,您不需要创建连接并显式维护它(容易出错)。

稍后,您可以将结果转换为dataframe。

为了加快速度,您可以配置其他参数。

JdbcRDD的示例代码用法如下。

代码语言:javascript
复制
import org.apache.log4j.{Level, Logger}
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext.__
  import org.apache.spark.SparkConf
  import org.apache.spark.rdd.JdbcRDD
  import java.sql.{connection, DriverManager,ResultSet}


  object jdbcRddExample {
    def main(args: Array[String]) {

        // Connection String    
        VAL URL = "jdbc:teradata://SERVER/demo"
        val username = "demo"
        val password = "Spark"
        Class.forName("com.teradata.jdbc.Driver").newInstance
        // Creating & Configuring Spark Context
        val conf = new SparkConf().setAppName("App1").setMaster("local[2]").set("spark.executor.memory",1)
        val sc = new SparkContext(conf)
        println("Start...")
        // Fetching data from Database
        val myRDD = new JdbcRDD(sc,() => DriverManager.getConnection(url,username,password),
        "select first_name, last_name, gender from person limit ?,?",
        3,5,1,r => r.getString("last_name") + "," +r.getString("first_name"))
        // Displaying the content
        myRDD.foreach(println)
        // Saving the content inside Text File
        myRDD.saveAsTextFile("c://jdbcrdd")

        println("End...")
    }
  }
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45512678

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档