我有一个简单的Raspberry pi + Django +芹菜+ Rabbitmq设置,我用它来发送和接收来自Xbee电台的数据,而用户与web应用程序交互。
为了我的生命,我不能得到拉比特姆(或芹菜?)在控制下,仅仅一天(有时更长的时间),整个系统就会因为某种内存泄漏而崩溃。
我怀疑的是,排队的队伍正在堆积,永远不会被移除。
下面是我在运行几分钟后看到的图片:

似乎所有的队列都处于“就绪”状态。
奇怪的是,工人们实际上确实接收到了消息并运行了任务。
任务非常小,不应超过1秒。我已经验证了任务执行到最后一行,并且应该返回ok。
我不是专家,也不知道我到底在看什么,所以我不确定这是否是正常的行为,我的问题在别处?
我已经将一切设置为守护进程,但是即使在开发模式下运行,结果也是一样的。
在过去的四个小时里,我一直在用谷歌搜索进行调试,发现它让我陷入了困境,而我却找不到清晰的答案。
CONFIGS和代码
在/ect/default/celeryd中,我设置了以下内容:
CELERY_APP="MyApp"
CELERYD_NODES="w1"
# Python interpreter from environment.
ENV_PYTHON="/home/pi/.virtualenvs/myappenv/bin/python"
# Where to chdir at start.
CELERYD_CHDIR="/home/pi/django_projects/MyApp"
# Virtual Environment Setup
ENV_MY="/home/pi/.virtualenvs/myappenv"
CELERYD="$ENV_MY/bin/celeryd"
CELERYD_MULTI="$ENV_PYTHON $CELERYD_CHDIR/manage.py celeryd_multi"
CELERYCTL="$ENV_MY/bin/celeryctl"
CELERYD_OPTS="--app=MyApp --concurrency=1 --loglevel=FATAL"
CELERYD_LOG_FILE="/var/log/celery/%n.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"
CELERYD_USER="celery"
CELERYD_GROUP="celery"tasks.py
@celery.task
def sendStatus(modelContext, ignore_result=True, *args, **kwargs):
node = modelContext#EndNodes.objects.get(node_addr_lg=node_addr_lg)
#check age of message and proceed to send status update if it is fresh, otherwise we'll skip it
if not current_task.request.eta == None:
now_date = datetime.now().replace(tzinfo=None) #the time now
eta_date = dateutil.parser.parse(current_task.request.eta).replace(tzinfo=None)#the time this was supposed to run, remove timezone from message eta datetime
delta_seconds = (now_date - eta_date).total_seconds()#seconds from when this task was supposed to run
if delta_seconds >= node.status_timeout:#if the message was queued more than delta_seconds ago this message is too old to process
return
#now that we know the message is fresh we can proceed to process the contents and send status to xbee
hostname = current_task.request.hostname #the name/key in the schedule that might have related xbee sessions
app = Celery('app')#create a new instance of app (because documented methods didnt work)
i = app.control.inspect()
scheduled_tasks = i.scheduled()#the schedule of tasks in the queue
for task in scheduled_tasks[hostname]:#iterate through each task
xbee_session = ast.literal_eval(task['request']['kwargs'])#the request data in the message (converts unicode to dict)
if xbee_session['xbee_addr'] == node.node_addr_lg:#get any session data for this device that we may have set from model's save override
if xbee_session['type'] == 'STAT':#because we are responding with status update we look for status sessions
app.control.revoke(task['request']['id'], terminate=True)#revoke this task because it is redundant and we are sending update now
page_mode = chr(node.page_mode)#the paging mode to set on the remote device
xbee_global.tx(dest_addr_long=bytearray.fromhex(node.node_addr_lg),
frame_id='A',
dest_addr='\xFF\xFE',
data=page_mode)芹菜飞溅:
-------------- celery@raspberrypi v3.1.23 (Cipater)
---- **** -----
--- * *** * -- Linux-4.4.11-v7+-armv7l-with-debian-8.0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: MyApp:0x762efe10
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: amqp://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. MyApp.celery.debug_task
. clone_app.tasks.nodeInterval
. clone_app.tasks.nodePoll
. clone_app.tasks.nodeState
. clone_app.tasks.resetNetwork
. clone_app.tasks.sendData
. clone_app.tasks.sendStatus
[2016-10-11 03:41:12,863: WARNING/Worker-1] Got signal worker_process_init for task id None
[2016-10-11 03:41:12,913: WARNING/Worker-1] JUST OPENED
[2016-10-11 03:41:12,915: WARNING/Worker-1] /dev/ttyUSB0
[2016-10-11 03:41:12,948: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2016-10-11 03:41:13,101: INFO/MainProcess] mingle: searching for neighbors
[2016-10-11 03:41:14,206: INFO/MainProcess] mingle: all alone
[2016-10-11 03:41:14,341: WARNING/MainProcess] celery@raspberrypi ready.
[2016-10-11 03:41:16,223: WARNING/Worker-1] RAW DATA
[2016-10-11 03:41:16,225: WARNING/Worker-1] {'source_addr_long': '\x00\x13\xa2\x00@\x89\xe9\xd7', 'rf_data': '...^%:STAT:`', 'source_addr': '[*', 'id': 'rx', 'options': '\x01'}
[2016-10-11 03:41:16,458: INFO/MainProcess] Received task: clone_app.tasks.sendStatus[6e1a74ec-dca5-495f-a4fa-906a5c657b26] eta:[2016-10-11 03:41:17.307421+00:00]如果需要,我可以提供更多的细节!谢谢你帮忙解决这个问题。
发布于 2016-10-11 04:37:56
哇,在发完我的问题后,我立刻找到了this post,它已经完全解决了我的问题。
正如我所预期的,ignore_result=True是必需的,我只是不知道它属于哪里。
现在,我没有看到任何队列,除非是在工人正在运行任务的那一刻。:)
下面是tasks.py中的变化:
#From
@celery.task
def sendStatus(modelContext, ignore_result=True, *args, **kwargs):
#Some code here
#To
@celery.task(ignore_result=True)
def sendStatus(modelContext, *args, **kwargs):
#Some code herehttps://stackoverflow.com/questions/39970366
复制相似问题