我已经开始在各种项目中使用Prefect,现在我需要决定在GCP上使用哪种部署策略最好。最好是我喜欢无服务器工作。Comparing Cloud Run, Cloud Functions and App Engine,我倾向于选择后者,因为它没有超时限制,而其他两个分别有9个。15分钟
我很想知道人们是如何无服务器地部署Prefect流的,比如流被调度/触发以进行批处理,而代理在不使用时自动缩小。
或者,一种更经典的方法是在Compute Engine and schedule this via Cloud Scheduler上部署Prefect。但我觉得这有点过时了,没有公平地对待Prefect的功能和未来开发的灵活性。
发布于 2020-04-28 02:27:06
感兴趣的是,人们是如何无服务器地部署Prefect流的,比如流被调度/触发以进行批处理,而代理在不使用时自动缩小。
Prefect在AWS Lambda的无服务器部署上有一个blog post,这是一个很好的蓝图,可以用GCP做同样的事情。这里的挑战是代理扩展-代理通过定期(每隔10秒)轮询后端(无论是Prefect Server的自我部署还是托管的Prefect Cloud)来工作。脑海中浮现的一种可能性是使用云函数来启动进程中的代理,这是由您正在考虑的任何批处理/调度事件触发的。您还可以使用kwarg CLI参数或-max-polls来启动代理以查找运行;如果在您指定的轮询尝试次数之后仍未找到任何内容,则代理将自行关闭。该here或任何特定座席页面上的详细信息。
然而,对于长时间运行的流来说,这可能是低效的,并且您可能会遇到资源上限;如果工作负载足够高,那么可能值得考虑触发自动扩展Dask cluster部署。Prefect supports that natively with Kubernetes,并且有一个Kubernetes agent来与你的集群交互。我认为这将是最优雅和可扩展的解决方案,而不需要走经典的计算引擎路线,我同意这条路线有点过时,没有提供很好的自动伸缩或一流的管理。
对无服务器执行的更好支持已经在路线图上,特别是无服务器代理正在工作中,但我还没有关于何时发布的预计值。
希望这能有所帮助!:)
发布于 2021-10-28 16:29:11
最近添加到Prefect的是使用GCP顶点的Vertex Agent,它是AIP的继承者。Vertex有一个高度可配置的无服务器执行环境,并且没有超时。
发布于 2021-12-17 15:09:28
基本上,有两种黑客方法来解决这个问题。
缓存和持久化数据
默认情况下,Prefect Core将所有数据、结果和缓存状态存储在运行流的Python进程的内存中。但是,如果配置了必要的挂钩,则可以持久化它们并从外部位置检索它们。
Prefect有一个“检查点”的概念,它确保每次任务成功运行时,它的返回值都会根据任务的结果对象和目标中的配置写入持久存储。
@task(result=LocalResult(dir="~/.prefect"), target="task.txt")
def func_task():
return 99完整的代码示例如下所示。在这里,我们使用GCSResult对Google Cloud Bucket进行写入和读取。
import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"
from prefect import task, Flow
from prefect.engine.results import LocalResult, GCSResult
@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task1():
print("Task 1")
return "Task 1"
@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task2():
print("Task 2")
return "Task 2"
@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task3():
print("Task 3")
return "Task 3"
@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task4():
print("Task 4")
@task
def task5():
print("Task 5")
@task
def task6():
print("Task 6")
@task
def task7():
print("Task 7")
@task
def task8():
print("Task 8")
# with Flow("This is My First Flow",result=LocalResult(dir="~/prefect")) as flow:
with Flow("this is my first flow", result=GCSResult(bucket="prefect")) as flow:
t1, t2 = task1(), task2()
t3 = task3(upstream_tasks=[t1,t2])
t4 = task4(upstream_tasks=[t3])
t5 = task5(upstream_tasks=[t4])
t6 = task6(upstream_tasks=[t4])
t7 = task7(upstream_tasks=[t2,t6])
t8 = task8(upstream_tasks=[t2,t3])
# run the whole flow
flow_state = flow.run()
# visualize the flow
flow.visualize(flow_state)
# print the state of the flow
print(flow_state.result)发布执行结果
另一种黑客解决方案是将google云函数之前的执行结果发布到随后的执行中。这里,我们假设任务之间没有数据输入和输出依赖。
需要进行一些修改才能实现它。
在publishing
之前更改任务状态的自定义状态处理程序更改任务状态
首先,我们知道flow.run函数在所有任务进入完成状态后结束,无论它是成功还是失败。但是,我们不希望所有任务都在google云函数的单个调用中运行,因为总运行时间可能超过540秒。
因此,将使用任务的自定义状态处理程序。每次任务完成时,我们都会向prefect框架发出ENDRUN信号。然后,它会将剩余任务的状态设置为已取消。
from prefect import task, Flow, Task
from prefect.engine.runner import ENDRUN
from prefect.engine.state import State, Cancelled
num_finished = 0
def my_state_handler(obj, old_state, new_state):
global num_finished
if num_finished >= 1:
raise ENDRUN(state=Cancelled("Flow run is cancelled"))
if new_state.is_finished():
num_finished += 1
return new_state其次,为了让取消状态的任务下一次正确执行,我们必须手动将它们的状态更改为pending。
def run(task_state_dict: Dict[Task, State]) -> Dict[Task, State]:
flow_state = flow.run(task_states=task_state_dict)
task_states = flow_state.result
# change task state before next publish
for t in task_states:
if isinstance(task_states[t], Cancelled):
task_states[t] = Pending("Mocked pending")
# TODO: reset global counter
global num_finished
num_finished = 0
# task state for next run
return task_states第三,有两个基本函数: encoding_data和decode_data。前者将待发布的任务状态序列化,后者将任务状态反序列化为流对象。
# encoding: utf-8
from typing import List, Dict, Any
from prefect.engine.state import State
from prefect import Flow, Task
def decode_data(flow: Flow, data: List[Dict[str, Any]]) -> Dict[Task, State]:
# data as follows:
# [
# {
# "task": {
# "slug": "task1"
# }
# "state": {
# "type": "Success",
# "message": "Task run succeeded(manually set)"
# }
# }
# ]
task_states = {}
for d in data:
tasks_found = flow.get_tasks(d['task']['slug'])
if len(tasks_found) != 1: # 不唯一就不做处理了
continue
state = State.deserialize(
{"message": d['state']['message'],
"type": d['state']['type']
}
)
task_states[tasks_found[0]] = state
return task_states
def encode_data(task_states: Dict[Task, State]) -> List[Dict[str, Any]]:
data = []
for task, state in task_states.items():
data.append({
"task": task.serialize(),
"state": state.serialize()
})
return data最后但并非最不重要的一点是,编排将上述所有部分连接起来。def main(data: List[Dictstr,Any],*args,**kargs) -> List[Dictstr,Any]:task_states =decode_data(流,数据) task_states = run(task_states)返回encode_data(task_states)
if __name__ == "__main__":
evt = []
while True:
data = main(evt)
states = defaultdict(set)
for task in data:
task_type, slug = task['state']['type'], task['task']['slug']
states[task_type].add(slug)
if len(states['Pending']) == 0:
sys.exit(0)
evt = data
# send pubsub message here
# GooglePubsub().publish(evt)
# sys.exit(0)https://stackoverflow.com/questions/61403167
复制相似问题