首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >MongoDB中的读写配置设置管理

MongoDB中的读写配置设置管理
EN

Stack Overflow用户
提问于 2020-06-11 04:24:52
回答 1查看 426关注 0票数 2

假设在MongoDB中有多个DB2(DB1,DB2,.DBa,DBb,.)他们每个人都有一些收藏(Col1A,Col1B,.Col2A,Col2B,)

我希望找到一种方法来管理MongoDB中的多个输入和输出。我想用Scala语言编写一个self-contained Scala应用程序。下面是伪代码,它展示了我的想法:

代码语言:javascript
复制
readconfig_DB1.Col1A=Read setting pointing to DB=DB1 and collection=Col1A
readconfig_DB2.Col2B=Read setting pointing to DB=DB2 and collection=Col2B

val rdd_DB1.Col1A = MongoSpark.load(sc_DB1.Col1A)
val rdd_DB2.Col2B = MongoSpark.load(sc_DB2.Col2B)

DF_Transofmration1 = Do some transformations on DF1a and DF2b
DF_Transofmration2 = Do some transformations on DF1b and DF2a

writeConfig_DBa.Col1A=Write setting pointing to DB=DB1 and collection=Col1A
writeConfig_DBb.Col2B=Write setting pointing to DB=DB2 and collection=Col2B

MongoSpark.save(DF_Transofmration1 , writeConfig_DBa.Col1A)
MongoSpark.save(DF_Transofmration2 , writeConfig_DBa.Col2B)

Edit1

我试着解决这个问题。

文件夹的结构:

代码语言:javascript
复制
$find .
.
./src
./src/main
./src/main/scala
./src/main/scala/application.conf
./src/main/scala/SimpleApp.scala
./build.sbt

build.sbt含量

代码语言:javascript
复制
scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
  "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
  "org.apache.spark" %% "spark-core" % "2.4.1",
  "org.apache.spark" %% "spark-sql" % "2.4.1"
)

application.conf含量

代码语言:javascript
复制
config{
    database {
      "spark_mongodb_input_uri": "mongodb://127.0.0.1/test.myCollection",
      "spark_mongodb_user":"",
      "spark_mongodb_pass":"",
      "spark_mongodb_input_database": "test",
      "spark_mongodb_input_collection": "myCollection",
      "spark_mongodb_input_readPreference_name": "",
      "spark_mongodb_output_database": "test",
      "spark_mongodb_output_collection": "myCollection"
    }

    newreaderone {
      "database": "test",
      "collection": "myCollection",
      "readPreference.name": ""
    }

    newwriterone {
      "uri":"mongodb://127.0.0.1/test.myCollection"
      "database": "test",
      "collection": "myCollection",
      "replaceDocument": "false",//If set to True, updates an existing document
      "readPreference.name": "",
      "maxBatchSize": "128"
    }
}

SimpleApp.scala含量

代码语言:javascript
复制
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession

object FirstApp {
  def main(args: Array[String]) {

    import com.typesafe.{Config,ConfigFactory}
    val appConfig: Config = ConfigFactory.load("application.conf")
    import scala.collection.JavaConverters._
    val initial_conf:Config = appconf.getConfig("config.database")
    val confMap: Map[String,String] = initial_conf.entrySet()
    .iterator.asScala
    .map(e => e.getKey.replaceAll("_",".") -> e.getValue.unwrapped.toString).toMap
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame,SparkSession}
    val sparkConfig: SparkConf=new SparkConf()
    sparkConfig.setAll(confMap)
    val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate
    import com.mongodb.spark._
    val data: DataFrame = MongoSpark.load(spark)
    import com.mongodb.spark.config._
    val nreader = appConfig.getConfig("config.newreaderone")
    val readMap: Map[String,Any] = nreader.entrySet()
    .iterator.asScala
    .map(e => e.getKey -> e.getValue.unwrapped)
    .toMap
    val customReader = ReadConfig(readMap)
    val newDF: DataFrame = spark.read.mongo(customReader)
    resultDF.write.mode("append").mongo()

 }
}

编译后出错:

代码语言:javascript
复制
sbt package
[info] Updated file /Path/3/project/build.properties: set sbt.version to 1.3.10
[info] Loading global plugins from /home/sadegh/.sbt/1.0/plugins
[info] Loading project definition from /Path/3/project
[info] Loading settings for project root-3 from build.sbt ...
[info] Set current project to root-3 (in build file:/Path/3/)
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] Compiling 1 Scala source to /Path/3/target/scala-2.11/classes ...
[error] /Path/3/src/main/scala/SimpleApp.scala:8:13: object typesafe is not a member of package com
[error]         import com.typesafe.{Config,ConfigFactory}
[error]                    ^
[error] /Path/3/src/main/scala/SimpleApp.scala:9:17: not found: type Config
[error]         val appConfig: Config = ConfigFactory.load("application.conf")
[error]                        ^
[error] /Path/3/src/main/scala/SimpleApp.scala:9:26: not found: value ConfigFactory
[error]         val appConfig: Config = ConfigFactory.load("application.conf")
[error]                                 ^
[error] /Path/3/src/main/scala/SimpleApp.scala:11:19: not found: type Config
[error]         val initial_conf:Config = appconf.getConfig("config.database")
[error]                          ^
[error] /Path/3/src/main/scala/SimpleApp.scala:11:28: not found: value appconf
[error]         val initial_conf:Config = appconf.getConfig("config.database")
[error]                                   ^
[error] /Path/3/src/main/scala/SimpleApp.scala:19:56: not found: value sparkConf
[error]         val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate
[error]                                                               ^
[error] /Path/3/src/main/scala/SimpleApp.scala:28:21: overloaded method value apply with alternatives:
[error]   (options: scala.collection.Map[String,String])com.mongodb.spark.config.ReadConfig.Self <and>
[error]   (sparkConf: org.apache.spark.SparkConf)com.mongodb.spark.config.ReadConfig.Self <and>
[error]   (sqlContext: org.apache.spark.sql.SQLContext)com.mongodb.spark.config.ReadConfig.Self <and>
[error]   (sparkSession: org.apache.spark.sql.SparkSession)com.mongodb.spark.config.ReadConfig.Self <and>
[error]   (sparkContext: org.apache.spark.SparkContext)com.mongodb.spark.config.ReadConfig.Self
[error]  cannot be applied to (scala.collection.immutable.Map[String,Any])
[error]         val customReader = ReadConfig(readMap)
[error]                            ^
[error] /Path/3/src/main/scala/SimpleApp.scala:29:36: value mongo is not a member of org.apache.spark.sql.DataFrameReader
[error]         val newDF: DataFrame = spark.read.mongo(customReader)
[error]                                           ^
[error] /Path/3/src/main/scala/SimpleApp.scala:30:2: not found: value resultDF
[error]         resultDF.write.mode("append").mongo()
[error]         ^
[error] 9 errors found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 12 s, completed Jun 14, 2020 6:55:43 PM
EN

回答 1

Stack Overflow用户

发布于 2020-06-13 08:28:40

您可以将这些配置作为输入传递到HOCON中的应用程序。您可以尝试以下HOCON配置片段,用于多重读写配置。

代码语言:javascript
复制
config{
    database {
      "spark_mongodb_input_uri": "mongodb://connection/string/here",
      "spark_mongodb_user":"your_user_name",
      "spark_mongodb_pass":"your_password",
      "spark_mongodb_input_database": "Some_Db_Name",
      "spark_mongodb_input_collection": "Some_Col_Name",
      "spark_mongodb_input_readPreference_name": "primaryPreferred",
      "spark_mongodb_output_database": "Some_Output_Db_Name",
      "spark_mongodb_output_collection": "Some_Output_Col_Name"
    }

    newreaderone {
      "database": "sf",
      "collection": "matchrecord",
      "readPreference.name": "secondaryPreferred"
    }

    newwriterone {
      "uri":"mongodb://uri of same or new mongo cluster/instance"
      "database": "db_name",
      "collection": "col_name",
      "replaceDocument": "false",//If set to True, updates an existing document
      "readPreference.name": "secondaryPreferred",
      "maxBatchSize": "128"
    }
}

上面的配置已经过测试,可以通过Typesafe Config库轻松阅读。

更新:

在文件调用application.conf.中输入上述配置使用下面的代码行读取该文件

步骤1:读取配置文件

代码语言:javascript
复制
import com.typesafe.{Config,ConfigFactory}

val appConfig: Config = ConfigFactory.load("/path/to/application.conf")

步骤2:要初始化spark以读取和写入MongoDB,我们使用数据库部分下的配置,如下所示:

代码语言:javascript
复制
import scala.collection.JavaConverters._

val initial_conf:Config = appconf.getConfig("config.database")

val confMap: Map[String,String] = initial_conf.entrySet()
.iterator.asScala
.map(e => e.getKey.replaceAll("_",".") -> e.getValue.unwrapped.toString).toMap

步骤3:创建SparkSession

代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame,SparkSession}

val sparkConfig: SparkConf=new SparkConf()

sparkConfig.setAll(confMap)

val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate

步骤4:从DataFrame中读取MongoDB

代码语言:javascript
复制
import com.mongodb.spark._
val data: DataFrame = MongoSpark.load(spark)

以上步骤读取配置的数据库部分中指定的集合。

步骤5:读取新集合:

代码语言:javascript
复制
import com.mongodb.spark.config._

val nreader = appConfig.getConfig("config.newreaderone")

val readMap: Map[String,Any] = nreader.entrySet()
.iterator.asScala
.map(e => e.getKey -> e.getValue.unwrapped)
.toMap

val customReader = ReadConfig(readMap)

val newDF: DataFrame = spark.read.mongo(customReader)

步骤6:写入MongoDB集合

代码语言:javascript
复制
resultDF.write.mode("append").mongo()

上面的代码写入配置的数据库部分下指定的集合。

( ii)写入SparkConf中指定的集合以外的集合

代码语言:javascript
复制
import com.mongodb.spark.config._


val nwriter = appConfig.getConfig("config.newwriterone")

val writerMap: Map[String,Any] = nreader.entrySet()
            .iterator.asScala
            .map(e => e.getKey -> e.getValue.unwrapped).toMap

val writeConf = WriteConfig(writerMap)

MongoSpark.save(resultDF, writeConf)

更新:

整个代码看起来就像折页一样,最后看看我将数据存储到MongoDb中的两种方法

代码语言:javascript
复制
import com.typesafe.{Config,ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame,SparkSession}
import scala.collection.JavaConverters._
import com.mongodb.spark._
import com.mongodb.spark.config._


object ReadWriteMongo{

  def main(args: Array[String]): Unit = {

    val appConfig: Config = ConfigFactory.load("application.conf")

    val initial_conf:Config = appconf.getConfig("config.database")

    val confMap: Map[String,String] = initial_conf.entrySet()
    .iterator.asScala
    .map(e => e.getKey.replaceAll("_",".") -> e.getValue.unwrapped.toString).toMap

    val sparkConfig: SparkConf=new SparkConf()

    sparkConfig.setAll(confMap)

    val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate

    val data: DataFrame = MongoSpark.load(spark)

    val nreader = appConfig.getConfig("config.newreaderone")

    val readMap: Map[String,Any] = nreader.entrySet()
    .iterator.asScala
    .map(e => e.getKey -> e.getValue.unwrapped)
    .toMap

    val customReader = ReadConfig(readMap)

    /*
    Read Data from MongoDB
    */
    val newDF: DataFrame = spark.read.mongo(customReader)


    /*
    * After you performing processing on the newDF above and save
    * the result in a new Dataframe called "resultDF". 
    * You can save the DF as follows
    */
    resultDF.write.mode("append").mongo()

    /*
    *Alternatively You can save a Dataframe also by passing a WriteConfig as follows
    */

    val nwriter = appConfig.getConfig("config.newwriterone")

    val writerMap: Map[String,Any] = nreader.entrySet()
                .iterator.asScala
                .map(e => e.getKey -> e.getValue.unwrapped).toMap

    val writeConf = WriteConfig(writerMap)

    MongoSpark.save(resultDF, writeConf)
  }
}

文件夹结构应如下:

代码语言:javascript
复制
src/

src/main/scala/com/test/ReadWriteMongo.scala

src/main/resources/application.conf

更新:build.sbt

代码语言:javascript
复制
val sparkVersion = "2.4.1"

scalaVersion := "2.11.12"

scalacOptions ++= Seq(
  "-deprecation",
  "-feature",
  "-Xfuture",
  "-encoding",
  "UTF-8",
  "-unchecked",
  "-language:postfixOps"
)

libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.4.0",
"org.mongodb.spark" %% "mongo-spark-connector" % sparkVersion,
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided
)

mainClass in assembly := Some("com.test.SimpleApp.scala")

assembly / test := {}

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

assemblyMergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")       => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")   => MergeStrategy.discard
  case "reference.conf"                                 => MergeStrategy.concat
  case x: String if x.contains("UnusedStubClass.class") => MergeStrategy.first
  case _                                                => MergeStrategy.first
}
票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62317048

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档