我有一个数据帧,其中每一行都包含指向S3中某个位置的前缀。我想使用flatMap()遍历每一行,列出每个前缀中的S3对象,并返回一个新的数据帧,其中包含S3中列出的每个文件一行。
我有这样的代码:
import boto3
s3 = boto3.resource('s3')
def flatmap_list_s3_files(row):
bucket = s3.Bucket(row.bucket)
s3_files = []
for obj in bucket.objects.filter(Prefix=row.prefix):
s3_files.append(obj.key)
rows = []
for f in s3_files:
row_dict = row.asDict()
row_dict['s3_obj'] = f
rows.append(Row(**row_dict))
return rows
df = <code that loads the dataframe>
df.rdd.flatMap(lambda x: flatmap_list_s3_files(x))).toDF()唯一的问题是我猜s3对象是不可拾取的?所以我得到了这个错误,我不确定下一步该怎么做:
PicklingError: Cannot pickle files that are not opened for reading
我是一个spark新手,所以我希望有一些其他的应用程序接口或某种方法来并行化S3中的文件列表,并将其与原始数据帧结合在一起。需要说明的是,我并不是试图读取S3文件本身中的任何数据,而是在构建一个表,该表本质上是S3中所有文件的元数据目录。任何建议都将不胜感激。
发布于 2018-11-02 00:39:57
您不能在您的spark集群中发送s3客户端;您需要共享创建一个客户端并在远端实例化它所需的所有信息。我不知道.py,但是在java中,你只需要将路径作为字符串传递,然后将其转换成path对象,调用Path.getFileSystem()并在那里工作即可。Spark工作进程将缓存Filesystem实例,以便快速重用
https://stackoverflow.com/questions/53092928
复制相似问题