首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Lambda + awswrangler:处理“大型”拼花文件时性能不佳

Lambda + awswrangler:处理“大型”拼花文件时性能不佳
EN

Stack Overflow用户
提问于 2022-06-14 19:33:00
回答 2查看 727关注 0票数 1

我目前正在编写一个Lambda函数,平均使用Python和AWS wrangler函数从100 to o 200 to读取拼图文件。这样做的目的是读取文件并将它们转换为csv:

代码语言:javascript
复制
import awswrangler as wr
from io import StringIO

print('Loading function')

s3 = boto3.client('s3')
dest_bucket = "mydestbucket"

def lambda_handler(event, context):
    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    try:
        response = s3.get_object(Bucket=bucket, Key=key)
        print("CONTENTO TYPE: " + response['ContentType'])

        if key.endswith('.parquet'):
            dfs = wr.s3.read_parquet(path=['s3://' + bucket + '/' + key], chunked=True, use_threads=True)
            
            count=0
            for df in dfs:
                csv_buffer = StringIO()
                df.to_csv(csv_buffer)
                s3_resource = boto3.resource('s3')
                #s3_resource.Object(dest_bucket, 'dfo.csv').put(Body=df)
                s3_resource.Object(dest_bucket, 'dfo_' + str(count) + '.csv').put(Body=csv_buffer.getvalue())
                count += 1
                
            return "File written" 

当我使用小文件时,这个函数可以正常工作,但是一旦我尝试处理大文件(100 it ),它就会给出一个超时。

我已经为Lambda分配了3GB的内存,并将超时设置为10分钟,但是,它似乎不起作用。

除了分配更多内存外,你知道如何提高性能吗?

谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-06-22 18:02:09

我解决了这个问题,通过使用快速拼花创建了一个图层,它以比aws wrangler更好的方式处理内存。

票数 0
EN

Stack Overflow用户

发布于 2022-07-21 16:20:11

代码语言:javascript
复制
from io import StringIO
from datetime import datetime

import boto3
import fastparquet as fp
import s3fs
import urllib.parse


    #S3 fs initialization
    s3_fs = s3fs.S3FileSystem()
    fs = s3fs.core.S3FileSystem()

    s3fs_path = fs.glob(path=s3_path)
    my_open = s3_fs.open

    # Read parquet object using fastparquet
    fp_obj = fp.ParquetFile(s3fs_path, open_with=my_open)

    # Filter columns and build a pandas df
    new_df = fp_obj.to_pandas()

    # csv buffer to perform the parquet --> csv transformation
    csv_buffer = StringIO()
    new_df.to_csv(csv_buffer)

    s3_resource.Object(
            dest_bucket,
            f"{file_path}",
        ).put(Body=csv_buffer.getvalue())
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72622467

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档