我有一个python脚本来归档一些数据(从mongo导出到本地json文件,然后上传到s3)。
是否有更好的方法来实现/改进当前流?在失败的情况下,应该重新尝试每个步骤,如果在set重试之后该步骤仍然失败,则不应该运行其他步骤,而是继续对其他记录进行归档。
archived = 0
flow_status = False
for metadata_id in metadata_ids:
logging.info("Archiving snapshot with metadata_id %s", metadata_id)
# Export snapshot to a temp JSON file
for retry in range(0, 3):
flow_status = dump_snapshot(mongo, database, collection, str(metadata_id), folder_path, dry_run)
if not flow_status:
logging.info("Failed to export snapshot %s to json file. Retry no %s", str(metadata_id), retry)
else:
logging.info("Successfully exported snapshot %s to json file.", str(metadata_id))
break
if not flow_status:
logging.error("Snapshot archiving flow failed for %s. Failed to export to json file. ", str(metadata_id))
continue
# Save the temp file to S3
file_path = folder_path + '/' + database + '/' + collection + '/' + str(metadata_id) + '.json'
aws_path = "archived_snapshots/" + database + '/' + collection + '/' + str(metadata_id) + '.json'
for retry in range(0, 3):
flow_status = aws.upload_file(file_path, aws_path, dry_run)
if not flow_status:
logging.info("Failed to save snapshot file %s to s3. Retry no %s", str(metadata_id), retry)
else:
logging.info("Successfully saved snapshot %s to s3.", str(metadata_id))
break
if not flow_status:
logging.error("Snapshot archiving flow failed for %s. Failed to saved file to s3. ", str(metadata_id))
continue
# Validate the file exists on S3 and has the correct checksum
if not aws.check_file_exists(aws_path) or not aws.validate_s3_obj(file_path, aws_path):
logging.error("Snapshot archiving flow failed for %s. Failed to validate file. ", str(metadata_id))
continue
# Delete temp file
if not dry_run:
delete_file(file_path)
# Set the archive_status to "ArchivedToPurge" in the MongoDB metadata collection
metadata_collection = collection + "_metadata"
for retry in range(0, 3):
flow_status = update_archive_status(mongo.client[database][metadata_collection], metadata_collection,
metadata_id, archiving_status.ArchivedToPurge.name, dry_run)
if not flow_status:
logging.info("Failed to update snapshot metadata for %s to %s. Retry no %s", metadata_collection +
"/" + str(metadata_id), archiving_status.ArchivedToPurge.name, retry)
else:
logging.info("Updated snapshot metadata for %s to %s", metadata_collection + "/" + str(metadata_id),
archiving_status.ArchivedToPurge.name)
break
if not flow_status:
logging.error("Snapshot archiving flow failed for %s. Failed to update metadata to %s", str(metadata_id),
archiving_status.ArchivedToPurge.name)
continue
# Delete the documents in MongoDB
logging.info("Deleting snapshots with metadata_id %s from collection %s", str(metadata_id), collection)
for retry in range(0, 3):
if dry_run:
break
records_count = mongo_count(mongo.client[database][collection], match={"metadata_id": metadata_id})
deleted_count = mongo_delete(mongo.client[database][collection], {"metadata_id": metadata_id})
flow_status = deleted_count == records_count
if not flow_status:
logging.info("Failed to delete all records of snapshot with metadata_id %s. Deleted %s out of %s "
"records", str(metadata_id), deleted_count, records_count)
else:
logging.info("Deleted %s records of snapshot with metadata_id %s from collection %s",
deleted_count, str(metadata_id), collection)
break
if not flow_status:
logging.error("Snapshot archiving flow failed for %s. Failed to delete all snapshot records.",
str(metadata_id))
continue
# Set the archive_status to "ArchivedAndPurged" in the MongoDB metadata collection.
for retry in range(0, 3):
flow_status = update_archive_status(mongo.client[database][metadata_collection], metadata_collection,
metadata_id, archiving_status.ArchivedAndPurged.name, dry_run)
if not flow_status:
logging.info("Failed to update snapshot metadata for %s to %s. Retry no %s", metadata_collection +
"/" + str(metadata_id), archiving_status.ArchivedToPurge.name, retry)
else:
logging.info("Updated snapshot metadata for %s to %s", metadata_collection + "/" + str(metadata_id),
archiving_status.ArchivedToPurge.name)
break
if not flow_status:
logging.error("Snapshot archiving flow failed for %s. Failed to update metadata to %s.", str(metadata_id),
archiving_status.ArchivedAndPurged.name)
continue有什么建议吗?
发布于 2021-07-27 13:58:46
很好的密码。
在
for有else子句我认为它非常适合这里。在……里面
for retry in range(3):
if ...:
break
else:
#else-part如果没有中断,将执行else-part。所以:
for retry in range(0, 3):
if dump_snapshot(mongo, database, collection, str(metadata_id), folder_path, dry_run):
logging.info("Successfully exported snapshot %s to json file.", str(metadata_id))
else:
logging.info("Failed to export snapshot %s to json file. Retry no %s", str(metadata_id), retry)
break
else:
logging.error("Snapshot archiving flow failed for %s. Failed to export to json file. ", str(metadata_id))
continue代码可以去掉flow_status和额外的检查。
其他选项是为操作创建类层次结构并使用异常,而不是继续(循环中的代码很短,外部很长),或者使用lambda对操作和参数中的所有日志消息进行重试。第一个是长的,第二个读性有点差,比如:
retry(3,
lambda: dump_snapshot(mongo, database, collection, str(metadata_id), folder_path, dry_run),
"Successfully exported snapshot {} to json file.".format(str(metadata_id)),
"Failed to export snapshot {} to json file. Retry no %d".format(str(metadata_id)),
"Snapshot archiving flow failed for {}. Failed to export to json file. ".format(str(metadata_id)) )
def retry(count, f, success, fail_attempt, fail):
for attempt in range(count):
if f():
logging.info(success)
return
else:
logging.info(fail_attempt, retry)
raise RetryFailedExeption(fail)反正也不算太糟。但是,它需要计数的默认值和日志消息的关键字,以便进行如下调用:
retry(lambda: dump_snapshot(mongo, database, collection, str(metadata_id), folder_path, dry_run),
#count = 3 - skipped
success = "Successfully exported snapshot {} to json file.".format(str(metadata_id)),
fail = "Failed to export snapshot {} to json file. Retry no %d".format(str(metadata_id)),
failure = "Snapshot archiving flow failed for {}. Failed to export to json file. ".format(str(metadata_id)) )https://codereview.stackexchange.com/questions/264445
复制相似问题