您好,我正在尝试从我的spark/scala配置文件中读取配置。
我写了下面的代码。
val conf = com.typesafe.config.ConfigFactory.load(args(0))
var url=conf.getString("parameters.spark-hive.url")
var db=conf.getString("parameters.spark-hive.dbname")
val sparksession = SparkSession.builder()
.appName("myapp")
.config("spark.sql.hive.hiveserver2.jdbc.url",url)
.enableHiveSupport()
.getOrCreate()下面是我的application.conf文件(src/main/resources/application.conf)
parameters {
spark-hive {
url = """jdbc://xxxxxxxxxxxx""",
dbname = """bdname"""
}
}并使用以下Spark-submit命令:
spark-submit \
> --conf "spark.executor.extraClassPath=-Dconfig.file=application.conf"\
> --verbose \
> --class classjarname \
> project_jar
> /path/config-1.2.0.jar \
> /path/application.conf但是得到了低于错误。
Exception in thread "main" com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'parameters' Note:-I'm genetarting Jar using Maven and using HDP 3.X发布于 2020-04-06 18:18:27
您可以打印出args(0)的实际值,以查看(完整)路径引用的位置。这对我很有效:
com.typesafe.config.ConfigFactory.parseFile(new java.io.File(args(0)))补充说明:
project_jar在您的提交命令中是什么意思不匹配的SparkSession时,似乎有一个以hive-url作为代码的拼写错误
发布于 2020-04-06 18:38:33
线程"main“com.typesafe.config.ConfigException$Missing中出现异常:找不到key参数的配置设置,这表示它无法加载关键参数。这是您的conf文件的条目,指向未正确加载或未正确解析的配置文件。因此,我建议您读取该文件,然后尝试下一步,即使用这些参数创建Sparksession。如果加载正确,请在下面尝试读取文件内容
import scala.io.Source
import com.typesafe.config.ConfigFactory
val filename = ConfigFactory.load(args(0))
for (line <- Source.fromFile(filename).getLines) {
println(line)
}发布于 2020-04-07 19:17:21
我想向您展示一个如何使用com.typesafe.config库的简单示例。
这是我在application.properties目录下的资源。
## Structured Streaming device
device.zookeeper = quickstart.cloudera:2181
device.bootstrap.server = quickstart.cloudera:9092
device.topic = device
device.execution.mode = local
device.data.host = quickstart.cloudera
device.data.port = 44444
## HBase
device.zookeeper.quorum = quickstart.cloudera
device.zookeeper.port = 2181
device.window = 1

这是获取属性的代码,args(0) == device
def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load // get Confs
val envProps: Config = conf.getConfig(args(0)) // args(0) == device
val sparkConf = new SparkConf().setMaster(envProps.getString("execution.mode")).setAppName("Device Signal") // get execution.mode conf
val streamingContext = new StreamingContext(sparkConf, Seconds(envProps.getInt("window"))) // get window conf
streamingContext.sparkContext.setLogLevel("ERROR")
val broadcastConfig = streamingContext.sparkContext.broadcast(envProps)
val topicsSet = Set(envProps.getString("topic")) // get topic conf
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> envProps.getString("bootstrap.server"), // get bootstrap.server conf
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val logData: DStream[String] = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
).map(record =>{
record.value
})https://stackoverflow.com/questions/61056228
复制相似问题