首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Google Cloud中运行Prefect flow serverless的最佳实践

在Google Cloud中运行Prefect flow serverless的最佳实践
EN

Stack Overflow用户
提问于 2020-04-24 15:21:09
回答 3查看 1.6K关注 0票数 11

我已经开始在各种项目中使用Prefect,现在我需要决定在GCP上使用哪种部署策略最好。最好是我喜欢无服务器工作。Comparing Cloud Run, Cloud Functions and App Engine,我倾向于选择后者,因为它没有超时限制,而其他两个分别有9个。15分钟

我很想知道人们是如何无服务器地部署Prefect流的,比如流被调度/触发以进行批处理,而代理在不使用时自动缩小。

或者,一种更经典的方法是在Compute Engine and schedule this via Cloud Scheduler上部署Prefect。但我觉得这有点过时了,没有公平地对待Prefect的功能和未来开发的灵活性。

EN

回答 3

Stack Overflow用户

发布于 2020-04-28 02:27:06

感兴趣的是,人们是如何无服务器地部署Prefect流的,比如流被调度/触发以进行批处理,而代理在不使用时自动缩小。

Prefect在AWS Lambda的无服务器部署上有一个blog post,这是一个很好的蓝图,可以用GCP做同样的事情。这里的挑战是代理扩展-代理通过定期(每隔10秒)轮询后端(无论是Prefect Server的自我部署还是托管的Prefect Cloud)来工作。脑海中浮现的一种可能性是使用云函数来启动进程中的代理,这是由您正在考虑的任何批处理/调度事件触发的。您还可以使用kwarg CLI参数或-max-polls来启动代理以查找运行;如果在您指定的轮询尝试次数之后仍未找到任何内容,则代理将自行关闭。该here或任何特定座席页面上的详细信息。

然而,对于长时间运行的流来说,这可能是低效的,并且您可能会遇到资源上限;如果工作负载足够高,那么可能值得考虑触发自动扩展Dask cluster部署。Prefect supports that natively with Kubernetes,并且有一个Kubernetes agent来与你的集群交互。我认为这将是最优雅和可扩展的解决方案,而不需要走经典的计算引擎路线,我同意这条路线有点过时,没有提供很好的自动伸缩或一流的管理。

对无服务器执行的更好支持已经在路线图上,特别是无服务器代理正在工作中,但我还没有关于何时发布的预计值。

希望这能有所帮助!:)

票数 8
EN

Stack Overflow用户

发布于 2021-10-28 16:29:11

最近添加到Prefect的是使用GCP顶点的Vertex Agent,它是AIP的继承者。Vertex有一个高度可配置的无服务器执行环境,并且没有超时。

票数 2
EN

Stack Overflow用户

发布于 2021-12-17 15:09:28

完整的解释在这里:https://jerryan.medium.com/hacking-ways-to-run-prefect-flow-serverless-in-google-cloud-function-bc6b249126e4

基本上,有两种黑客方法来解决这个问题。

  • 使用谷歌云存储将云函数的先前执行结果的任务状态持久保存到其后续执行中。

缓存和持久化数据

默认情况下,Prefect Core将所有数据、结果和缓存状态存储在运行流的Python进程的内存中。但是,如果配置了必要的挂钩,则可以持久化它们并从外部位置检索它们。

Prefect有一个“检查点”的概念,它确保每次任务成功运行时,它的返回值都会根据任务的结果对象和目标中的配置写入持久存储。

代码语言:javascript
复制
@task(result=LocalResult(dir="~/.prefect"), target="task.txt") 
def func_task():
    return 99

完整的代码示例如下所示。在这里,我们使用GCSResult对Google Cloud Bucket进行写入和读取。

代码语言:javascript
复制
import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"
from prefect import task, Flow  
from prefect.engine.results import LocalResult, GCSResult

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task1():
    print("Task 1")
    return "Task 1"

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task2():
    print("Task 2")
    return "Task 2"

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task3():
    print("Task 3")
    return "Task 3"

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task4():
    print("Task 4")

@task
def task5():
    print("Task 5")

@task
def task6():
    print("Task 6")

@task
def task7():
    print("Task 7")

@task
def task8():
    print("Task 8")
    

# with Flow("This is My First Flow",result=LocalResult(dir="~/prefect")) as flow:
with Flow("this is my first flow", result=GCSResult(bucket="prefect")) as flow:
    t1, t2 = task1(), task2()
    t3 = task3(upstream_tasks=[t1,t2])
    t4 = task4(upstream_tasks=[t3])
    t5 = task5(upstream_tasks=[t4])
    t6 = task6(upstream_tasks=[t4])
    t7 = task7(upstream_tasks=[t2,t6])
    t8 = task8(upstream_tasks=[t2,t3])

# run the whole flow
flow_state = flow.run()

# visualize the flow
flow.visualize(flow_state)

# print the state of the flow
print(flow_state.result)

发布执行结果

另一种黑客解决方案是将google云函数之前的执行结果发布到随后的执行中。这里,我们假设任务之间没有数据输入和输出依赖。

需要进行一些修改才能实现它。

在publishing

  • Encode/Decode

之前更改任务状态的自定义状态处理程序更改任务状态

首先,我们知道flow.run函数在所有任务进入完成状态后结束,无论它是成功还是失败。但是,我们不希望所有任务都在google云函数的单个调用中运行,因为总运行时间可能超过540秒。

因此,将使用任务的自定义状态处理程序。每次任务完成时,我们都会向prefect框架发出ENDRUN信号。然后,它会将剩余任务的状态设置为已取消。

代码语言:javascript
复制
from prefect import task, Flow, Task
from prefect.engine.runner import ENDRUN
from prefect.engine.state import State, Cancelled

num_finished = 0

def my_state_handler(obj, old_state, new_state):
    global num_finished
    if num_finished >= 1:
        raise ENDRUN(state=Cancelled("Flow run is cancelled"))

    if new_state.is_finished():  
        num_finished += 1
    return new_state

其次,为了让取消状态的任务下一次正确执行,我们必须手动将它们的状态更改为pending。

代码语言:javascript
复制
def run(task_state_dict: Dict[Task, State]) -> Dict[Task, State]:

flow_state = flow.run(task_states=task_state_dict)
task_states = flow_state.result

# change task state before next publish
for t in task_states:
    if isinstance(task_states[t], Cancelled):
        task_states[t] = Pending("Mocked pending")
        

# TODO: reset global counter
global num_finished
num_finished = 0

# task state for next run
return task_states

第三,有两个基本函数: encoding_data和decode_data。前者将待发布的任务状态序列化,后者将任务状态反序列化为流对象。

代码语言:javascript
复制
# encoding: utf-8
from typing import List, Dict, Any
from prefect.engine.state import State
from prefect import Flow, Task


def decode_data(flow: Flow, data: List[Dict[str, Any]]) -> Dict[Task, State]:
    # data as follows:
    # [
    #     {
    #         "task": {
    #               "slug": "task1"
    #          }
    #         "state": {
    #             "type": "Success",
    #             "message": "Task run succeeded(manually set)"
    #         }
    #     }
    # ]

    task_states = {}
    for d in data:

        tasks_found = flow.get_tasks(d['task']['slug'])
        if len(tasks_found) != 1:  # 不唯一就不做处理了
            continue

        state = State.deserialize(
            {"message": d['state']['message'],
             "type": d['state']['type']
             }
        )
        task_states[tasks_found[0]] = state

    return task_states


def encode_data(task_states: Dict[Task, State]) -> List[Dict[str, Any]]:
    data = []
    for task, state in task_states.items():
        data.append({
            "task": task.serialize(),
            "state": state.serialize()
        })
    return data

最后但并非最不重要的一点是,编排将上述所有部分连接起来。def main(data: List[Dictstr,Any],*args,**kargs) -> List[Dictstr,Any]:task_states =decode_data(流,数据) task_states = run(task_states)返回encode_data(task_states)

代码语言:javascript
复制
if __name__ == "__main__":
    evt = []

    while True:
        data = main(evt)

        states = defaultdict(set)
        for task in data:
            task_type, slug = task['state']['type'], task['task']['slug']
            states[task_type].add(slug)
        if len(states['Pending']) == 0:
            sys.exit(0)

        evt = data
        
        # send pubsub message here
        # GooglePubsub().publish(evt)
        # sys.exit(0)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61403167

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档