Python Version 3.7.5
Spark Version 3.0
Databricks Runtime 7.3我目前正在使用我的datalake文件系统中的路径。
这是
p = dbutils.fs.ls('dbfs:/databricks-datasets/nyctaxi')
print(p)
[FileInfo(path='dbfs:/databricks-datasets/nyctaxi/readme_nyctaxi.txt', name='readme_nyctaxi.txt', size=916),
FileInfo(path='dbfs:/databricks-datasets/nyctaxi/reference/', name='reference/', size=0),
FileInfo(path='dbfs:/databricks-datasets/nyctaxi/taxizone/', name='taxizone/', size=0),
FileInfo(path='dbfs:/databricks-datasets/nyctaxi/tripdata/', name='tripdata/', size=0)]现在,为了将其转换为有效的Pathlib Posix对象,我将其传递给一个函数
def create_valid_path(paths):
return Path('/dbfs').joinpath(*[part for part in Path(paths).parts[1:]])tripdata的输出为
PosixPath('/dbfs/databricks-datasets/nyctaxi/tripdata')现在,如果我想在将csvs的子集收集到列表中之后,将其读取到sparkdata帧中。
from pyspark.sql.functions import *
df = spark.read.format('csv').load(paths)这将返回
AttributeError: 'PosixPath' object has no attribute '_get_object_id'现在,我能让它工作的唯一方法是手动地预置路径dbfs:/..并将每一项返回到一个字符串,但是有必要使用Pathlib来执行一些基本的I/O操作。我是否遗漏了一些简单的东西,或者Pyspark只是不能读取pathlib对象吗?
e.g
trip_paths_str = [str(Path('dbfs:').joinpath(*part.parts[2:])) for part in trip_paths]
print(trip_paths_str)
['dbfs:/databricks-datasets/nyctaxi/tripdata/fhv/fhv_tripdata_2015-01.csv.gz',
'dbfs:/databricks-datasets/nyctaxi/tripdata/fhv/fhv_tripdata_2015-02.csv.gz'...]发布于 2020-10-20 23:24:37
那么换成这样做如何呢?
from pyspark.sql.functions import *
import os
def db_list_files(file_path):
file_list = [file.path for file in dbutils.fs.ls(file_path) if os.path.basename(file.path)]
return file_list
files = db_list_files('dbfs:/FileStore/tables/')
df = spark.read.format('text').load(files)
df.show()https://stackoverflow.com/questions/64447107
复制相似问题