首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在作业处理过程中更改python-gearman工作任务

在作业处理过程中更改python-gearman工作任务
EN

Stack Overflow用户
提问于 2011-03-19 14:54:53
回答 2查看 1.2K关注 0票数 0

我正在尝试更改python工作人员在其工作周期中可用的任务。我这样做的原因是允许我对我的工作进程进行一点控制,并允许它们从数据库中重新加载。我需要每个工作人员定期重新加载,但我不想简单地扼杀进程,我希望服务能够不断地可用,这意味着我必须批量地重新加载。所以我会让4名工人重新装载,而另4名工人可以处理,然后重新加载接下来的4名工人。

进程:

  1. 启动重装过程4次。
    1. 取消注册reload进程
    2. 重新加载数据集
    3. 注册finishReload任务
    4. 返回

  1. 重复步骤1,直到没有注册reload任务的工作人员为止。
  2. 启动finishReload(1)任务,直到没有可用finishReload任务的工作人员为止。

(1) finishReload任务取消注册finishReload任务并注册reload任务,然后返回。

现在,我遇到的问题是,当我更改工作进程可用的任务时,作业会失败。没有错误消息或异常,只是gearmand日志中的一个“错误”。下面是一个复制问题的快速程序。

工人

代码语言:javascript
复制
import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

客户端

代码语言:javascript
复制
import gearman 
client = gearman.GearmanClient(['localhost:4730']) 
a = client.submit_job('reversify', 'spam and eggs') 
print a.result 
>>> sgge dna maps 

a = client.submit_job('strcount', 'spam and eggs') 
...

如果有什么我可以澄清的,请告诉我。

编辑:我知道有人会要求看我提到的日志。我也在谷歌和那里有日志上发布了这个问题给gearman小组。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2011-03-21 15:46:50

看起来,对GearmanWorker类进行子类化并添加几个标志可以解决此问题。在开始向服务器发出新命令之前,我需要允许作业完成,这似乎会中断当前作业。因此,如果我们覆盖on_job_complete函数,我们可以检查启用/禁用标志,并在发出send_job_complete命令后对这些标志进行操作。新的工人方案如下:

工人

代码语言:javascript
复制
import gearman

def reversify(gmWorker, gmJob):
        return "".join(gmJob.data[::-1])

def enable_reversify(gmWorker, gmJob):
        myWorker.enableReversify = 1
        return 'OK'

def strcount(gmWorker, gmJob):
        myWorker.enableReversify = -1
        return str(len(gmJob.data))

class myWorker(gearman.GearmanWorker):

        enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on

        def on_job_complete(self, current_job, job_result):
                self.send_job_complete(current_job, job_result)
                ### check the flag here and enable or disable tasks ###
                if myWorker.enableReversify == -1:
                        self.unregister_task('reversify')
                if myWorker.enableReversify == 1:
                        self.register_task('reversify', reversify)
                myWorker.enableReversify = 0 # reset the flag
                return True

worker = myWorker(['localhost:4730']) 
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)

while True:
        worker.work() 
票数 1
EN

Stack Overflow用户

发布于 2011-03-21 01:56:25

快速地看一看,问题似乎是,您正在开始一个作业,然后注销工人在作业服务器完成之前完成该工作的能力。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/5362830

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档