我有以下代码来测试flink和hive集成。我通过flink run -m yarn-cluster ....提交申请。hiveConfDir是一个本地目录,驻留在我提交应用程序的机器上,我想问一下,当主类在集群中运行时,flink如何能够读取这个本地目录?谢谢!
package org.example.app
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.types.Row
object FlinkBatchHiveTableIntegrationTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val name = "myHiveCatalog"
val defaultDatabase = "default"
//how does flink could read this local directory
val hiveConfDir = "/apache-hive-2.3.7-bin/conf"
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir)
tenv.registerCatalog(name, hive)
tenv.useCatalog(name)
val sql =
"""
select * from testdb.t1
""".stripMargin(' ')
val table = tenv.sqlQuery(sql)
table.printSchema()
table.toAppendStream[Row].print()
env.execute("FlinkHiveIntegrationTest")
}
}发布于 2021-01-06 09:22:14
看起来我找到了答案。应用程序是通过flink run -m yarn-cluster.By以这种方式提交的,应用程序的main方法在安装了配置单元的客户端运行,因此可以读取配置单元的conf dir。
https://stackoverflow.com/questions/65575979
复制相似问题