我试图在JDBC中使用预置语句。其结果是ResultSet对象。我想把它转换成火花数据格式。
object JDBCRead {
val tableName:String = "TABLENAME"
val url :String = "jdbc:teradata://TERADATA_URL/user=USERNAME,password=PWD,charset=UTF8,TYPE=FASTEXPORT,SESSIONS=10"
val selectTable:String = "SELECT * FROM " + tableName +" sample 10";
val con : Connection = DriverManager.getConnection(url);
val pstmt2: PreparedStatement = con.prepareStatement(selectTable)
import java.sql.ResultSet
val rs: ResultSet = pstmt2.executeQuery
val rsmd: ResultSetMetaData = rs.getMetaData
while(rs.next()!=null)
{
val k: Boolean = rs.next()
for(i<-1 to rsmd.getColumnCount) {
print(" " + rs.getObject(i))
}
println()
}
}我想从Spark调用上面的代码,这样我就可以将数据加载到Dataframe中,并以更快的速度获得结果。
我必须使用PreparedStatement。我不能使用spark.jdbc.load,因为Teradata的FASTEXPORT不适用于jdbc。它必须与PreparedStatement一起使用
如何做到这一点?如何将用户预置语句与SELECT语句一起加载到Spark中。
发布于 2017-08-04 20:07:56
-
对于这类需求,有两个选项可用: 1.
DataFrame2.JdbcRDD
I将提供 https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala (因为您对预付费语句如此特殊)
在prepareStatement内部使用compute方法。因此,您不需要创建连接并显式维护它(容易出错)。
稍后,您可以将结果转换为dataframe。
为了加快速度,您可以配置其他参数。
JdbcRDD的示例代码用法如下。
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext.__
import org.apache.spark.SparkConf
import org.apache.spark.rdd.JdbcRDD
import java.sql.{connection, DriverManager,ResultSet}
object jdbcRddExample {
def main(args: Array[String]) {
// Connection String
VAL URL = "jdbc:teradata://SERVER/demo"
val username = "demo"
val password = "Spark"
Class.forName("com.teradata.jdbc.Driver").newInstance
// Creating & Configuring Spark Context
val conf = new SparkConf().setAppName("App1").setMaster("local[2]").set("spark.executor.memory",1)
val sc = new SparkContext(conf)
println("Start...")
// Fetching data from Database
val myRDD = new JdbcRDD(sc,() => DriverManager.getConnection(url,username,password),
"select first_name, last_name, gender from person limit ?,?",
3,5,1,r => r.getString("last_name") + "," +r.getString("first_name"))
// Displaying the content
myRDD.foreach(println)
// Saving the content inside Text File
myRDD.saveAsTextFile("c://jdbcrdd")
println("End...")
}
}https://stackoverflow.com/questions/45512678
复制相似问题