首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >django+celery+rabbitmq编码错误与sig-杀死

django+celery+rabbitmq编码错误与sig-杀死
EN

Stack Overflow用户
提问于 2016-10-27 10:28:47
回答 1查看 617关注 0票数 1

我现在正在做一个小项目,它使用芹菜将csv和xlsx文件转换为postgresql表。下面的代码在没有芹菜的情况下工作得很好(除了大型文件),但是在使用芹菜之后会产生一些错误和错误。我在StackOverFlow中寻找过类似的问题,但不知道如何做,也不知道为什么。希望你们能帮我,谢谢。

  1. 第一个错误如下:csv-1 csv-2我认为它与我的编码部分有关,但是我尝试用utf-8-sig和big-5打开它,不起作用。(没有芹菜,它工作得很好)

`

代码语言:javascript
复制
# -*- coding: utf-8 -*-
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from django.contrib import messages
from django.conf import settings
from django.db import connection
from django.views.decorators.csrf import csrf_exempt
from celery import Celery
from celery import task
import json
import csv
import sys
import random
import psycopg2
import xlrd
import openpyxl as pyxl
from .models import Document
from .forms import DocumentForm

app = Celery(
    'tasks',
    broker='amqp://guest:guest@localhost:5672//',
    backend='rpc://'
)
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_RESULT_PERSISTENT = False

@app.task()
def csvwritein(doc):# Transform csv to table
        doc = doc
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
 readcur = conn.cursor()
        readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
        check = readcur.fetchone()[0]
        try:
                fr = open(doc.path,encoding = 'utf-8-sig')
                dr.delay(fr,doc,check)
                fr.close()
        except Exception as e:
                fr = open(doc.path,encoding = 'big5')
                dr.delay(fr,doc,check)
                fr.close()
        conn.commit()
        readcur.close()

@app.task()
def dr(fr,doc,check): # make datareader as function to keep code 'dry'
        csvt = 0 #count csv reader loop time
        row_id = 1 # used for following id field
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
        maincur = conn.cursor()
        writecur = conn.cursor()
        datareader = csv.reader(fr, delimiter=',')
        for row in datareader:
                if csvt == 0: # first time in loop(create field) and check no
same file exists
                        if check == True:
                                app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
                                tname = '%s-%s' % (doc.tablename,app
tablename = '"%s-%s"' % (doc.tablename,app)
                                doc.tablename = tname
                                doc.save()
                        else:
                                tablename = '"%s"' % doc.tablename
                        maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
                        row_count = sum(1 for line in datareader)
                        col_count = len(row)
                        frow = row
                        for i in range(0,col_count,1):
                                row[i] = '"%s"' % row[i] # change number to
string
                                maincur.execute("ALTER TABLE %s ADD %s
CITEXT;" % (tablename,row[i]))
                        csvt = csvt+1
                        fr.seek(0)
                        next(datareader)
                elif csvt > 0: # not first time(insert data) and check no
same file exists
                        for j in range(0,col_count,1):
                                if j == 0:
                                        writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');" % (tablename,frow[j],row[j]))
                                else:
                                        writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';" %(tablename,frow[j],row[j],row_id))
                        csvt = csvt+1
                        row_id = row_id+1
                else:
                        break
        conn.commit()
        maincur.close()
        writecur.close()
        conn.close()
        csvt = 0
        doc = Document.objects.all()

`

  1. 第二个错误是将xlsx文件(约13万行)转换为postgresql表,工作人员在2-3分钟后得到sig-kill。调试消息:

2016-27 06:17:05,227: ERROR/MainProcess Process 'Worker-1‘pid:13829与'signal 9 (SIGKILL)’2016-10-27 06:17:05,328:ERROR/MainProcess任务data.tasks.xlsxwritein5aec4679-c48b-4d07-a0a9-5e4e37fcd24b意外地引发: WorkerLostError('Worker提前退出:信号9(SIGKILL)‘)跟踪(最近一次调用):mark_as_worker_lost human_status(Exitcode)中的文件"/usr/local/lib/python3.4/dist-packages/billiard/pool.py",第1175行,billiard.exceptions.WorkerLostError:工人提前退出:信号9 (SIGKILL)。

代码语言:javascript
复制
#The code continues from the above task.py file
@app.task()
def xlsxwritein(doc): # write into database for file type xlsx
        xlsxt = 0
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
        maincur = conn.cursor()
        readcur = conn.cursor()
        writecur = conn.cursor()
        readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
        check = readcur.fetchone()[0]
        row_id = 1 # used for following id field
        wb = pyxl.load_workbook(doc.path)
        sheetnames = wb.get_sheet_names()
        ws = wb.get_sheet_by_name(sheetnames[0])
        for rown in range(ws.get_highest_row()):
                if xlsxt == 0:
                        if check == True:
                                app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
                                tname = '%s-%s' % (doc.tablename,app)
                                tablename = '"%s-%s"' % (doc.tablename,app)
                                doc.tablename = tname
                                doc.save()
                        else:
                                tablename = '"%s"' % doc.tablename
                        field = [ws.cell(row=1,column=col_index).value for
col_index in range(1,ws.get_highest_column()+1)]
                        maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
                        for coln in range(ws.get_highest_column()):
                                field[coln] = '"%s"' % field[coln] # change
number to string
                                if field[coln] == 'ID':
                                        field[coln] = 'original_id'
                                maincur.execute("ALTER TABLE %s ADD %s
CITEXT;"  % (tablename,field[coln]))
                        xlsxt = xlsxt+1
                elif xlsxt > 0 and check == False: # not first time(insert
data) and check no same file exists
                        for coln in range(ws.get_highest_column()):
                                if coln == 0:
                                        writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');"
%(tablename,field[coln],str(ws.cell(row=rown,column=coln+1).value)))
                                else:
                                        writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';"
%(tablename,field[coln],str(ws.cell(row=rown+1,column=coln+1).value),row_id))
                        xlsxt = xlsxt+1
                        row_id = row_id+1
                else:
                        break
        conn.commit()
        maincur.close()
        readcur.close()
        writecur.close()
        conn.close()
        xlsxt = 0
EN

回答 1

Stack Overflow用户

发布于 2016-10-27 11:06:44

在论点反序列化过程中,可能出了什么问题。不要传递doc对象,而是尝试传递文件名,然后在任务中读取文件。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40282263

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档