我在AWS Sagemaker中的数据后处理有问题,我需要将一个大文本文件与预测值(~2-10 GB)分割成数百万个小文件(每个用户一个文件~3-10KB)。
源文件和目标文件都存储在S3中。输出文件将使用+ AWS提供给最终用户。
木星笔记本:
import boto3
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput, NetworkConfig
role = get_execution_role()
instance_type = 'ml.m4.4xlarge'
ecr_image_full_name = '0123456789.dkr.ecr.eu-central.amazonaws.com/maslick-sagemaker-processing-image:latest'
input_file = 'input.csv'
input_object = 's3://my-awesome-dataset/input.csv'
output_object = 's3://my-awesome-results'
network_config = NetworkConfig(enable_network_isolation=False,
subnets=["subnet-12345", "subnet-67890"],
security_group_ids=["sg-0123456789"])
script_processor = ScriptProcessor(role=role,
image_uri=ecr_image_full_name,
command=['python3'],
instance_count=1,
instance_type=instance_type)
input = ProcessingInput(source=input_object, destination='/opt/ml/processing/input')
output = ProcessingOutput(source='/opt/ml/processing/output', destination=output_object)
script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])文档:
FROM python:3.7-slim-buster
RUN pip3 install pandas==0.25.3
ENV PYTHONUNBUFFERED=TRUE在callable.py内部,我对输入文件进行预处理,并将结果放入/opt/ml/processing/output,例如:
/opt/ml/processing/output/93faa_654321010000007_latest.json
生成的文件将被ScriptProcessor保存到ScriptProcessor中:
s3://my-awesome-results/93faa_654321010000007_latest.json
输入文件仅仅是一个用csv分隔的csv文件。
654321010000007|1288858|AB|1
654321010000008|1266069|AB|2
654321010000009|0956486|AB|3
654321010000010|1295930|AB|4
654321010000011|0594956|AB|5
654321010000012|1231767|AB|6
654321010000013|1273878|CD|7
654321010000014|1295236|AB|8
654321010000015|1255404|AB|9生成的文件将如下所示(因此,基本上callable.py将遍历每一行并将其转换为json字符串):
{"id": 654321010000007, "article": 1288858, "type": "AB", "rank": 1}callable.py
import hashlib
import json
import sys
from collections import defaultdict
from concurrent.futures.process import ProcessPoolExecutor
from pathlib import Path
import pandas as pd
def saveFilesMultiProcesses(items):
with ProcessPoolExecutor() as executor:
for item in items:
executor.submit(saveFile, item)
def readCsv(input_file):
colnames = ['id', 'article', 'type', 'rank']
df = pd.read_csv('/opt/ml/processing/input/{}'.format(input_file), sep='|', names=colnames)
return df
def processCsv(df):
dicts = []
for row in df.itertuples():
dict = defaultdict(lambda: defaultdict(list))
dict["id"] = row.id
dict["article"] = row.article
dict["type"] = row.type
dict["rank"] = row.rank
dicts.append(dict)
return dicts
def saveFile(item):
hashed_prefix = hashlib.md5(str(item['id']).encode('utf-8')).hexdigest()
short = hashed_prefix[:5]
file_name = short + "_" + str(item['id']) + "_latest.json"
outfile = Path('/opt/ml/processing/output', file_name)
with open(outfile, 'w') as json_file:
json.dump(item, json_file)
if __name__ == '__main__':
input_file = sys.argv[1]
df = readCsv(input_file)
list_of_dicts = processCsv(df)
saveFilesMultiProcesses(list_of_dicts)
print("Done. Wait until all files are saved to S3")我已经能够处理一个小数据集(32 to,13540条记录)。当我尝试120万条记录(2.2GB)时,ScriptProcessor成功地处理了输入文件并将输出文件保存到/opt/ml/processing/output中,但是它未能将它们放入S3中,但有以下错误:
---------------------------------------------------------------------------
UnexpectedStatusException Traceback (most recent call last)
<ipython-input-66-48dccaef0bee> in <module>()
----> 1 script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in run(self, code, inputs, outputs, arguments, wait, logs, job_name, experiment_config)
402 self.jobs.append(self.latest_job)
403 if wait:
--> 404 self.latest_job.wait(logs=logs)
405
406 def _get_user_code_name(self, code):
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in wait(self, logs)
726 """
727 if logs:
--> 728 self.sagemaker_session.logs_for_processing_job(self.job_name, wait=True)
729 else:
730 self.sagemaker_session.wait_for_processing_job(self.job_name)
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in logs_for_processing_job(self, job_name, wait, poll)
3132
3133 if wait:
-> 3134 self._check_job_status(job_name, description, "ProcessingJobStatus")
3135 if dot:
3136 print()
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in _check_job_status(self, job, desc, status_key_name)
2636 ),
2637 allowed_statuses=["Completed", "Stopped"],
-> 2638 actual_status=status,
2639 )
2640
UnexpectedStatusException: Error for Processing job maslick-sagemaker-processing-image-2020-06-11-15-42-34-593: Failed. Reason: InternalServerError: We encountered an internal error. Please try again.在文档中,编写了Processor类(及其子类ScriptProcessor),用于数据预处理、后处理、特性工程、数据验证和模型评估。显然,它不是用来处理数百万文件的(放到S3上)。
有什么想法吗?提前谢谢。
发布于 2022-04-06 10:22:23
我也有类似的问题。对我来说,最佳的解决方案是使用带火花的Glue作业。但是,我也做了一个测试,这个测试对我来说是有效的,直接在s3上写作而不使用处理输出。
https://stackoverflow.com/questions/62330719
复制相似问题