首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用flatMap()在PySpark中并行列出S3对象?

如何使用flatMap()在PySpark中并行列出S3对象?
EN

Stack Overflow用户
提问于 2018-11-01 06:40:34
回答 1查看 1.5K关注 0票数 3

我有一个数据帧,其中每一行都包含指向S3中某个位置的前缀。我想使用flatMap()遍历每一行,列出每个前缀中的S3对象,并返回一个新的数据帧,其中包含S3中列出的每个文件一行。

我有这样的代码:

代码语言:javascript
复制
import boto3
s3 = boto3.resource('s3')

def flatmap_list_s3_files(row):
    bucket = s3.Bucket(row.bucket)
    s3_files = []
    for obj in bucket.objects.filter(Prefix=row.prefix):
        s3_files.append(obj.key)

    rows = []
    for f in s3_files:
        row_dict = row.asDict()
        row_dict['s3_obj'] = f
        rows.append(Row(**row_dict))
    return rows

df = <code that loads the dataframe>
df.rdd.flatMap(lambda x: flatmap_list_s3_files(x))).toDF()

唯一的问题是我猜s3对象是不可拾取的?所以我得到了这个错误,我不确定下一步该怎么做:

PicklingError: Cannot pickle files that are not opened for reading

我是一个spark新手,所以我希望有一些其他的应用程序接口或某种方法来并行化S3中的文件列表,并将其与原始数据帧结合在一起。需要说明的是,我并不是试图读取S3文件本身中的任何数据,而是在构建一个表,该表本质上是S3中所有文件的元数据目录。任何建议都将不胜感激。

EN

回答 1

Stack Overflow用户

发布于 2018-11-02 00:39:57

您不能在您的spark集群中发送s3客户端;您需要共享创建一个客户端并在远端实例化它所需的所有信息。我不知道.py,但是在java中,你只需要将路径作为字符串传递,然后将其转换成path对象,调用Path.getFileSystem()并在那里工作即可。Spark工作进程将缓存Filesystem实例,以便快速重用

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53092928

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档