我正在尝试在django命令中使用ProcessPoolExecutor来同时获得一些结果。我试着用下面的代码来得到它
# main codes
import json
import time
import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from redis import Redis
from django.conf import settings
from django.core.management.base import BaseCommand
from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol
from utils.cache import pool
from services.analysis.thrift.Analysis import Client, Dashparam
from api.analysis.models import MDashBoard
redis_con = Redis(connection_pool=pool)
class AnalysisThriftService(object):
def __init__(self):
...
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.transport.close()
class Command(BaseCommand):
help = 'python manage.py --settings "xxx"'
def add_arguments(self, parser):
parser.add_argument('--dashboard_id', type=int, help="ID")
@staticmethod
def _handle_with_thrift(dashboard_id):
try:
print(dashboard_id)
with AnalysisThriftService() as thrift_server:
dashboard_result = ...
except:
import traceback
traceback.print_exc()
def handle(self, *args, **options):
dashboard_id = options["dashboard_id"]
if dashboard_id is None:
dashboard_tables = [dashboard.id for dashboard in MDashBoard.objects.all()]
with ProcessPoolExecutor(max_workers=5) as executor:
executor.map(Command._handle_with_thrift, dashboard_tables)
else:
...但我总是会犯错误
Process Process-5:
Process Process-2:
Traceback (most recent call last):
File "D:\python3\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "D:\python3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "D:\python3\lib\concurrent\futures\process.py", line 169, in _process_worker
call_item = call_queue.get(block=True)
File "D:\python3\lib\multiprocessing\queues.py", line 113, in get
return _ForkingPickler.loads(res)
File "C:\Users\Domob\Desktop\dev\myapi\analysis\management\commands\dashboard_schedule_task.py", line 15, in <modu
le>
from api.analysis.models import MDashBoard
File "C:\Users\Domob\Desktop\dev\myapi\analysis\models.py", line 4, in <module>
from utils.models import BasicModel, StaticCharField
File "C:\Users\Domob\Desktop\dev\myapi\utils\models.py", line 9, in <module>
class BasicModel(models.Model):
File "C:\Users\Domob\Desktop\dev\venv_myapi\lib\site-packages\django\db\models\base.py", line 103, in __new__
app_config = apps.get_containing_app_config(module)
File "C:\Users\Domob\Desktop\dev\venv_myapi\lib\site-packages\django\apps\registry.py", line 252, in get_containing_ap
p_config
self.check_apps_ready()
File "C:\Users\Domob\Desktop\dev\venv_myapi\lib\site-packages\django\apps\registry.py", line 135, in check_apps_ready
raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.我怎样才能得到预期的结果。
非常感谢。
发布于 2020-08-31 10:30:15
您需要在子进程初始化函数中为子进程设置Django。
def subprocess_setup():
django.setup()
# Could do other things here
# ...
with ProcessPoolExecutor(max_workers=5, initializer=subprocess_setup) as executor:发布于 2022-01-31 16:39:01
得到了相同的问题,但解决方法不同,正如this thread所建议的。
我必须显式地将上下文传递给ProcessPoolExecutor,以便正确处理。代码看起来应该是这样的
import multiprocessing
fork_context = multiprocessing.get_context('fork')
with ProcessPoolExecutor(max_workers=5, mp_context=fork_context) as executor:
...https://stackoverflow.com/questions/63669056
复制相似问题