首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >viewflow.io:实现队列任务

viewflow.io:实现队列任务
EN

Stack Overflow用户
提问于 2015-07-28 00:58:42
回答 1查看 686关注 0票数 5

我想用ViewFlow library实现以下用例

问题

由用户启动的特定流的进程在执行芹菜作业之前必须在队列中等待。每个用户都有一个这些进程的队列。根据调度或手动触发,允许队列中的下一个进程继续进行。

示例

我的流中的节点进入命名队列。对于每个队列,应用程序中的其他逻辑确定何时允许下一个任务继续。选择队列中的下一个任务,并调用其激活的done()方法。

示例流程可能如下所示:

代码语言:javascript
复制
class MyFlow(Flow):

    start = flow.Start(...).Next(queue_wait)
    queue_wait = QueueWait("myQueue").Next(job)
    job = celery.Job(...).Next(end)
    end = flow.End()

问题

实现排队的最佳方法是什么?在上面的例子中,我不知道"QueueWait“应该是什么。

我已经通读了文档和视图流代码,但我还不清楚这是否可以使用内置的Node和Activation类(如func.Function )来完成,或者是否需要使用自定义类进行扩展。

EN

回答 1

Stack Overflow用户

发布于 2015-07-30 00:52:13

经过多次实验,我得出了一个可行且简单的解决方案:

代码语言:javascript
复制
from viewflow.flow import base
from viewflow.flow.func import FuncActivation
from viewflow.activation import STATUS


class Queue(base.NextNodeMixin,
            base.UndoViewMixin,
            base.CancelViewMixin,
            base.DetailsViewMixin,
            base.Event):

    """
    Node that halts the flow and waits in a queue. To process the next waiting task
    call the dequeue method, optionally specifying the task owner.

    Example placing a job in a queue::

        class MyFlow(Flow):
            wait = Queue().Next(this.job)
            job = celery.Job(send_stuff).Next(this.end)
            end = flow.End()

        somewhere in the application code:
        MyFlow.wait.dequeue()
        or:
        MyFlow.wait.dequeue(process__myprocess__owner=user)

    Queues are logically separated by the task_type, so new queues defined in a
    subclass by overriding task_type attribute.
    """

    task_type = 'QUEUE'
    activation_cls = FuncActivation

    def __init__(self, **kwargs):
        super(Queue, self).__init__(**kwargs)

    def dequeue(self, **kwargs):
        """
        Process the next task in the queue by created date/time. kwargs is
        used to add task filter arguments, thereby effectively splitting the queue
        into subqueues. This could be used to implement per-user queues.

        Returns True if task was found and dequeued, False otherwise
        """
        filter_kwargs = {'flow_task_type': self.task_type, 'status': STATUS.NEW}
        if kwargs is not None:
            filter_kwargs.update(kwargs)

        task = self.flow_cls.task_cls.objects.filter(**filter_kwargs).order_by('created').first()
        if task is not None:
            lock = self.flow_cls.lock_impl(self.flow_cls.instance)
            with lock(self.flow_cls, task.process_id):
                task = self.flow_cls.task_cls._default_manager.get(pk=task.pk)
                activation = self.activation_cls()
                activation.initialize(self, task)
                activation.prepare()
                activation.done()
            return True

        return False

我试图使其尽可能通用,并支持多个命名队列和子队列的定义,例如每个用户的队列。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31658996

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档