我正在我的本地机器pycharm上编写一些代码。执行是在databricks集群上完成的,而数据存储在azure datalake上。
基本上,我需要列出azure datalake目录中的文件,然后对这些文件应用一些读取逻辑,为此,我使用以下代码
sc = spark.sparkContext
hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path('adl://<Account>.azuredatalakestore.net/<path>')
for f in fs.get(conf).listStatus(path):
print(f.getPath(), f.getLen())上面的代码在databricks笔记本上运行得很好,但是当我尝试使用databricks-connect通过pycharm运行相同的代码时,我得到了以下错误。
"Wrong FS expected: file:///....."经过深入挖掘后发现,代码正在我的本地驱动器中查找“路径”。我在使用python库(os,pathlib)时遇到了类似的问题
在集群上运行其他代码没有问题。
我需要帮助找出如何运行它,以便搜索数据记录,而不是我的本地计算机。
此外,由于某些限制,azure-datalake-store客户端不是一个选项。
发布于 2020-02-23 00:57:58
你可以用这个。
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import java.net.URI
def listFiles(basep: String, globp: String): Seq[String] = {
val conf = new Configuration(sc.hadoopConfiguration)
val fs = FileSystem.get(new URI(basep), conf)
def validated(path: String): Path = {
if(path startsWith "/") new Path(path)
else new Path("/" + path)
}
val fileCatalog = InMemoryFileIndex.bulkListLeafFiles(
paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
hadoopConf = conf,
filter = null,
sparkSession = spark)
fileCatalog.flatMap(_._2.map(_.path))
}
val root = "/mnt/{path to your file directory}"
val globp = "[^_]*"
val files = listFiles(root, globp)
files.toDF("path").show()https://stackoverflow.com/questions/60131166
复制相似问题