我最近接了达格斯特,作为气流的替代物。
我还没能理解资源的概念,也无法理解我试图做的事情是可能的,还是可以以不同的方式更好地实现。
我有一个帮助类,如下所示,它可以帮助代码保持干燥。
from dagster import resource, solid, ModeDefinition, pipeline
from dagster_aws.s3 import s3_resource
class HelperAwsS3:
def __init__(self, s3_resource):
self.s3_resource = s3_resource
def s3_list_bucket(self, bucket, prefix):
return self.s3_resource.list_objects_v2(
Bucket=bucket,
Prefix=prefix
)
def s3_download_file(self, bucket, file, local_path):
self.s3_resource.meta.client.download_file(
Bucket=bucket,
Key=file,
Filename=local_path
)
def s3_upload_file(self, bucket, file, local_path):
self.s3_resource.meta.client.upload_file(
Bucket=bucket,
Key=file,
Filename=local_path
)s3_resource实际上是dagster_aws.s3.s3_resource,它将帮助我使用本地AWS产品连接到aws。
当我在下面的s3_resource @Resource部分中进行调用时,我不知道如何将HelperAwsS3传递给。
@resource
def connection_helper_aws_s3_resource(context):
return HelperAwsS3()有什么建议吗?还是我做错了,需要用不同的方式去做?
谢谢你的帮助。
发布于 2021-10-13 16:46:30
我在达格斯拉克频道上发布了同样的问题,齐克利得到了帮助小组的答复。把它贴在这里,以防对别人有帮助-
保留HelperAwsS3类并编写使用s3资源的自己的资源,它可能如下所示:
@resource(required_resource_keys={"s3"})
def connection_helper_aws_s3_resource(context):
return HelperAwsS3(s3_resource=context.resources.s3)(然后确保在模式定义中同时包含s3资源和自定义资源:
@pipeline(mode_defs=[ModeDefinition(
resource_defs={"s3": s3_resource, "connection_helper_aws_s3": connection_helper_aws_s3_resource}
)]):
...https://stackoverflow.com/questions/69553675
复制相似问题