首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ActiveMQ :如何使用fork-join?即。如何在所有子任务完成时发出一条消息

ActiveMQ :如何使用fork-join?即。如何在所有子任务完成时发出一条消息
EN

Stack Overflow用户
提问于 2021-05-04 19:47:45
回答 2查看 104关注 0票数 0

假设你有一些任务结构

代码语言:javascript
复制
Task1
Task2: 1 million separate independent Subtask[i] that can run concurrently
Task3: must run once after ALL Task2 subtasks have completed

所有的Task1、Subtaski和Task3都由MQ消息表示。如何在ActiveMQ上解决此问题?尤其是在所有子任务完成后触发Task3消息。

我知道,这不是排队问题,而是fork-join问题。假设环境要求您必须对其使用ActiveMQ。

允许使用ActiveMQ特性、动态队列和消费者之类的东西。不允许使用外部计数器,如表示Task2进度的数据库行。

EN

回答 2

Stack Overflow用户

发布于 2021-05-05 22:09:16

在这个fork-join问题中隐藏着一个状态管理和可观察性挑战。由于数据库已被排除,您必须依赖于内存中或队列中的某些东西。

  1. 为任务运行创建了一个惟一的id --它很短,但有足够的空间不会像飞机定位器代码那样发生冲突--即。34FDSX
  2. 将任务的所有消息发送到队列控制消息queue://TASK.34FDSX.DATA
  3. Send ://TASK.34FDSX.CONTROL包含任务id和预期的消息总数(包括每个队列也会很有帮助)
  4. 当来自queue://TASK.34FDSX.DATA的消费者完成他们的工作后,他们应该向queue://TASK.34FDSX.DONE messageId发送一条'done‘消息,其中包含他们的messageId或一些标识符。

.CONTROL队列和.DONE队列的使用者应该是相同的进程,并且可以跟踪预期的和总的已完成任务。一旦一切都完成了,他就可以触发事件来触发任务#3。

这种方法提供了“在线”的所有功能,如果在任务完成之前过了太长时间,你也可以让.CONTROL和.DONE阅读器超时。

队列删除可以使用ActiveMQ destination GC完成,或者在一切都成功完成时作为.CONTROL/.DONE读取器中的清理步骤。

优势:

  • No无限阻塞使用者
  • 通过队列和队列度量--队列大小、入队计数、出队计数
  • ,任务的无限开放状态是联机的,并且可以观察到。
  • 整个解决方案可以是多线程的,唯一的要求是对于给定的任务,.CONTROL/.DONE侦听器是相同的使用者,但多个任务可以有单独的.CONTROL/.DONE侦听器进行扩展。
票数 1
EN

Stack Overflow用户

发布于 2021-05-05 11:22:32

这里的问题有点含糊,所以我的答案也必须有点含糊。

“Task2”的百万个独立子任务中的每一个都可以由一条消息表示。所有这些消息都可以在同一队列中。您可以根据需要启动任意数量的使用者,并处理所有这些消息(即执行所有子任务)。只需确保这些使用者使用客户端确认模式或事务会话,以便在处理完消息之前不会将消息从队列中删除。一旦队列中没有更多的消息,你就知道“任务2”已经完成了。

要检测队列何时为空,您可以在队列上有一个“特殊”的使用者,它周期性地打开事务处理的会话,并尝试使用队列中的消息。如果使用者接收到消息,那么您可以回滚事务处理的会话以将消息放回队列,并且您知道队列不是空的(即“任务2”未完成)。如果消费者没有收到消息,那么您就知道队列是空的,您可以发送另一条消息来表明这一点。您可以在发送子任务的所有消息之后,将这个特殊的使用者作为“Task2”的一部分启动,以避免过早地检测到空队列。

需要说明的是,这是一个简单的解决方案。你当然可以根据你的需求增加更多的复杂性,但你的问题只是概述了基本的问题,所以不清楚你还有什么其他需求(如果有的话)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67384173

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档