假设在MongoDB中有多个DB2(DB1,DB2,.DBa,DBb,.)他们每个人都有一些收藏(Col1A,Col1B,.Col2A,Col2B,)
我希望找到一种方法来管理MongoDB中的多个输入和输出。我想用Scala语言编写一个self-contained Scala应用程序。下面是伪代码,它展示了我的想法:
readconfig_DB1.Col1A=Read setting pointing to DB=DB1 and collection=Col1A
readconfig_DB2.Col2B=Read setting pointing to DB=DB2 and collection=Col2B
val rdd_DB1.Col1A = MongoSpark.load(sc_DB1.Col1A)
val rdd_DB2.Col2B = MongoSpark.load(sc_DB2.Col2B)
DF_Transofmration1 = Do some transformations on DF1a and DF2b
DF_Transofmration2 = Do some transformations on DF1b and DF2a
writeConfig_DBa.Col1A=Write setting pointing to DB=DB1 and collection=Col1A
writeConfig_DBb.Col2B=Write setting pointing to DB=DB2 and collection=Col2B
MongoSpark.save(DF_Transofmration1 , writeConfig_DBa.Col1A)
MongoSpark.save(DF_Transofmration2 , writeConfig_DBa.Col2B)Edit1:
我试着解决这个问题。
文件夹的结构:
$find .
.
./src
./src/main
./src/main/scala
./src/main/scala/application.conf
./src/main/scala/SimpleApp.scala
./build.sbtbuild.sbt含量
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
"org.apache.spark" %% "spark-core" % "2.4.1",
"org.apache.spark" %% "spark-sql" % "2.4.1"
)application.conf含量
config{
database {
"spark_mongodb_input_uri": "mongodb://127.0.0.1/test.myCollection",
"spark_mongodb_user":"",
"spark_mongodb_pass":"",
"spark_mongodb_input_database": "test",
"spark_mongodb_input_collection": "myCollection",
"spark_mongodb_input_readPreference_name": "",
"spark_mongodb_output_database": "test",
"spark_mongodb_output_collection": "myCollection"
}
newreaderone {
"database": "test",
"collection": "myCollection",
"readPreference.name": ""
}
newwriterone {
"uri":"mongodb://127.0.0.1/test.myCollection"
"database": "test",
"collection": "myCollection",
"replaceDocument": "false",//If set to True, updates an existing document
"readPreference.name": "",
"maxBatchSize": "128"
}
}SimpleApp.scala含量
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession
object FirstApp {
def main(args: Array[String]) {
import com.typesafe.{Config,ConfigFactory}
val appConfig: Config = ConfigFactory.load("application.conf")
import scala.collection.JavaConverters._
val initial_conf:Config = appconf.getConfig("config.database")
val confMap: Map[String,String] = initial_conf.entrySet()
.iterator.asScala
.map(e => e.getKey.replaceAll("_",".") -> e.getValue.unwrapped.toString).toMap
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame,SparkSession}
val sparkConfig: SparkConf=new SparkConf()
sparkConfig.setAll(confMap)
val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate
import com.mongodb.spark._
val data: DataFrame = MongoSpark.load(spark)
import com.mongodb.spark.config._
val nreader = appConfig.getConfig("config.newreaderone")
val readMap: Map[String,Any] = nreader.entrySet()
.iterator.asScala
.map(e => e.getKey -> e.getValue.unwrapped)
.toMap
val customReader = ReadConfig(readMap)
val newDF: DataFrame = spark.read.mongo(customReader)
resultDF.write.mode("append").mongo()
}
}编译后出错:
sbt package
[info] Updated file /Path/3/project/build.properties: set sbt.version to 1.3.10
[info] Loading global plugins from /home/sadegh/.sbt/1.0/plugins
[info] Loading project definition from /Path/3/project
[info] Loading settings for project root-3 from build.sbt ...
[info] Set current project to root-3 (in build file:/Path/3/)
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] Compiling 1 Scala source to /Path/3/target/scala-2.11/classes ...
[error] /Path/3/src/main/scala/SimpleApp.scala:8:13: object typesafe is not a member of package com
[error] import com.typesafe.{Config,ConfigFactory}
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:9:17: not found: type Config
[error] val appConfig: Config = ConfigFactory.load("application.conf")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:9:26: not found: value ConfigFactory
[error] val appConfig: Config = ConfigFactory.load("application.conf")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:11:19: not found: type Config
[error] val initial_conf:Config = appconf.getConfig("config.database")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:11:28: not found: value appconf
[error] val initial_conf:Config = appconf.getConfig("config.database")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:19:56: not found: value sparkConf
[error] val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:28:21: overloaded method value apply with alternatives:
[error] (options: scala.collection.Map[String,String])com.mongodb.spark.config.ReadConfig.Self <and>
[error] (sparkConf: org.apache.spark.SparkConf)com.mongodb.spark.config.ReadConfig.Self <and>
[error] (sqlContext: org.apache.spark.sql.SQLContext)com.mongodb.spark.config.ReadConfig.Self <and>
[error] (sparkSession: org.apache.spark.sql.SparkSession)com.mongodb.spark.config.ReadConfig.Self <and>
[error] (sparkContext: org.apache.spark.SparkContext)com.mongodb.spark.config.ReadConfig.Self
[error] cannot be applied to (scala.collection.immutable.Map[String,Any])
[error] val customReader = ReadConfig(readMap)
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:29:36: value mongo is not a member of org.apache.spark.sql.DataFrameReader
[error] val newDF: DataFrame = spark.read.mongo(customReader)
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:30:2: not found: value resultDF
[error] resultDF.write.mode("append").mongo()
[error] ^
[error] 9 errors found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 12 s, completed Jun 14, 2020 6:55:43 PM发布于 2020-06-13 08:28:40
您可以将这些配置作为输入传递到HOCON中的应用程序。您可以尝试以下HOCON配置片段,用于多重读写配置。
config{
database {
"spark_mongodb_input_uri": "mongodb://connection/string/here",
"spark_mongodb_user":"your_user_name",
"spark_mongodb_pass":"your_password",
"spark_mongodb_input_database": "Some_Db_Name",
"spark_mongodb_input_collection": "Some_Col_Name",
"spark_mongodb_input_readPreference_name": "primaryPreferred",
"spark_mongodb_output_database": "Some_Output_Db_Name",
"spark_mongodb_output_collection": "Some_Output_Col_Name"
}
newreaderone {
"database": "sf",
"collection": "matchrecord",
"readPreference.name": "secondaryPreferred"
}
newwriterone {
"uri":"mongodb://uri of same or new mongo cluster/instance"
"database": "db_name",
"collection": "col_name",
"replaceDocument": "false",//If set to True, updates an existing document
"readPreference.name": "secondaryPreferred",
"maxBatchSize": "128"
}
}上面的配置已经过测试,可以通过Typesafe Config库轻松阅读。
更新:
在文件调用application.conf.中输入上述配置使用下面的代码行读取该文件
步骤1:读取配置文件
import com.typesafe.{Config,ConfigFactory}
val appConfig: Config = ConfigFactory.load("/path/to/application.conf")步骤2:要初始化spark以读取和写入MongoDB,我们使用数据库部分下的配置,如下所示:
import scala.collection.JavaConverters._
val initial_conf:Config = appconf.getConfig("config.database")
val confMap: Map[String,String] = initial_conf.entrySet()
.iterator.asScala
.map(e => e.getKey.replaceAll("_",".") -> e.getValue.unwrapped.toString).toMap步骤3:创建SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame,SparkSession}
val sparkConfig: SparkConf=new SparkConf()
sparkConfig.setAll(confMap)
val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate步骤4:从DataFrame中读取MongoDB
import com.mongodb.spark._
val data: DataFrame = MongoSpark.load(spark)以上步骤读取配置的数据库部分中指定的集合。
步骤5:读取新集合:
import com.mongodb.spark.config._
val nreader = appConfig.getConfig("config.newreaderone")
val readMap: Map[String,Any] = nreader.entrySet()
.iterator.asScala
.map(e => e.getKey -> e.getValue.unwrapped)
.toMap
val customReader = ReadConfig(readMap)
val newDF: DataFrame = spark.read.mongo(customReader)步骤6:写入MongoDB集合
resultDF.write.mode("append").mongo()上面的代码写入配置的数据库部分下指定的集合。
( ii)写入SparkConf中指定的集合以外的集合
import com.mongodb.spark.config._
val nwriter = appConfig.getConfig("config.newwriterone")
val writerMap: Map[String,Any] = nreader.entrySet()
.iterator.asScala
.map(e => e.getKey -> e.getValue.unwrapped).toMap
val writeConf = WriteConfig(writerMap)
MongoSpark.save(resultDF, writeConf)更新:
整个代码看起来就像折页一样,最后看看我将数据存储到MongoDb中的两种方法
import com.typesafe.{Config,ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame,SparkSession}
import scala.collection.JavaConverters._
import com.mongodb.spark._
import com.mongodb.spark.config._
object ReadWriteMongo{
def main(args: Array[String]): Unit = {
val appConfig: Config = ConfigFactory.load("application.conf")
val initial_conf:Config = appconf.getConfig("config.database")
val confMap: Map[String,String] = initial_conf.entrySet()
.iterator.asScala
.map(e => e.getKey.replaceAll("_",".") -> e.getValue.unwrapped.toString).toMap
val sparkConfig: SparkConf=new SparkConf()
sparkConfig.setAll(confMap)
val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate
val data: DataFrame = MongoSpark.load(spark)
val nreader = appConfig.getConfig("config.newreaderone")
val readMap: Map[String,Any] = nreader.entrySet()
.iterator.asScala
.map(e => e.getKey -> e.getValue.unwrapped)
.toMap
val customReader = ReadConfig(readMap)
/*
Read Data from MongoDB
*/
val newDF: DataFrame = spark.read.mongo(customReader)
/*
* After you performing processing on the newDF above and save
* the result in a new Dataframe called "resultDF".
* You can save the DF as follows
*/
resultDF.write.mode("append").mongo()
/*
*Alternatively You can save a Dataframe also by passing a WriteConfig as follows
*/
val nwriter = appConfig.getConfig("config.newwriterone")
val writerMap: Map[String,Any] = nreader.entrySet()
.iterator.asScala
.map(e => e.getKey -> e.getValue.unwrapped).toMap
val writeConf = WriteConfig(writerMap)
MongoSpark.save(resultDF, writeConf)
}
}文件夹结构应如下:
src/
src/main/scala/com/test/ReadWriteMongo.scala
src/main/resources/application.conf更新:build.sbt
val sparkVersion = "2.4.1"
scalaVersion := "2.11.12"
scalacOptions ++= Seq(
"-deprecation",
"-feature",
"-Xfuture",
"-encoding",
"UTF-8",
"-unchecked",
"-language:postfixOps"
)
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.4.0",
"org.mongodb.spark" %% "mongo-spark-connector" % sparkVersion,
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided
)
mainClass in assembly := Some("com.test.SimpleApp.scala")
assembly / test := {}
assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
assemblyMergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case x: String if x.contains("UnusedStubClass.class") => MergeStrategy.first
case _ => MergeStrategy.first
}https://stackoverflow.com/questions/62317048
复制相似问题