我有工作所依赖的资料来源。在Google PubSub中,每个源都有一个单独的主题;当更新一个源时,它在相应的主题订阅中发送一条消息。当所有源被更新时(即当每个订阅中至少有一条新消息时),作业就可以启动。这项工作是按气流安排的。DAG从一系列并行任务开始,每个订阅都检查是否发布了新消息,但没有对其进行分类。下一个任务等待所有前面的任务,并使用XCOM查看 all 是否包含一条消息。在这种情况下,它将继续执行作业(它首先对消息进行分类),否则它将停止。通过这种方式,我只在所有消息都可用时才确认消息,并使用PubSub作为协调器。消息频率最多每天一次或两次。
基本上,我使用PubSub作为保持“状态”的方式。假设我有不同的工作依赖于相同的来源。我可以为每一项工作创建一个相同主题的订阅,这一切都很好。
是否有更好的方法/工具/框架来做到这一点?
发布于 2021-01-04 19:26:55
根据您拥有的消息量,并从我以前的实现中,我可以建议您在Firestore中持久化状态:无服务器、负担得起、快速.
当消息发布时,触发一个函数,该函数在Firestore中保持状态。
然后,触发所需的进程数,查询Firestone以检查所有状态是否正常,并继续或停止。
这是我的同步模式。不是最好的!
无论如何,如果您创建每个进程的订阅,它也可以工作。消息在每个订阅中重复,因此您可以独立地处理它们。
https://stackoverflow.com/questions/65565657
复制相似问题