首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在装饰器内运行多处理

在装饰器内运行多处理
EN

Stack Overflow用户
提问于 2012-04-29 08:15:54
回答 1查看 2K关注 0票数 4

我想重申一下关于装饰器内部多处理的问题(在我看来,我之前的问题似乎已经过时了:)。我偶然发现了这个问题,不幸的是我不知道如何解决这个问题。为了需要我的应用程序,我必须在装饰器内使用多处理,但是.当我在装饰器中使用多处理时,我会得到错误:Can't pickle <function run_testcase at 0x00000000027789C8>: it's not found as __main__.run_testcase。另一方面,当我像普通函数wrapper(function,*arg)一样调用我的多处理函数时,它可以工作。这很棘手,但我不知道我做错了什么。我即将得出结论,这是python错误:)。也许有人知道这个问题的解决办法,留下相同的语法。我在Windows上运行这段代码(不幸的是)。

前一个问题:Using multiprocessing inside decorator generates error: can't pickle function...it's not found as

模拟此错误的最简单代码:

代码语言:javascript
复制
from multiprocessing import Process,Event

class ExtProcess(Process):
    def __init__(self, event,*args,**kwargs):
        self.event=event
        Process.__init__(self,*args,**kwargs)

    def run(self):
        Process.run(self)
        self.event.set()

class PythonHelper(object):

    @staticmethod
    def run_in_parallel(*functions):
        event=Event()
        processes=dict()
        for function in functions:
            fname=function[0]
            try:fargs=function[1]
            except:fargs=list()
            try:fproc=function[2]
            except:fproc=1
            for i in range(fproc):
                process=ExtProcess(event,target=fname,args=fargs)
                process.start()
                processes[process.pid]=process
        event.wait()
        for process in processes.values():
            process.terminate()
        for process in processes.values():
            process.join()
代码语言:javascript
复制
class Recorder(object):
    def capture(self):
        while True:print("recording")
代码语言:javascript
复制
from z_helper import PythonHelper
from z_recorder import Recorder

def wrapper(fname,*args):
    try:
        PythonHelper.run_in_parallel([fname,args],[Recorder().capture])
        print("success")
    except Exception as e:
        print("failure: {}".format(e))
代码语言:javascript
复制
from z_wrapper import wrapper
from functools import wraps

class Report(object):
    @staticmethod
    def debug(fname):
        @wraps(fname)
        def function(*args):
            wrapper(fname,args)
        return function

执行:

代码语言:javascript
复制
from z_report import Report
import time

class Test(object):
    @Report.debug
    def print_x(self,x):
        for index,data in enumerate(range(x)):
            print(index,data); time.sleep(1)

if __name__=="__main__":
    Test().print_x(10)

我在上一个版本中添加了@wraps

我的回溯:

代码语言:javascript
复制
Traceback (most recent call last):
  File "C:\Interpreters\Python32\lib\pickle.py", line 679, in save_global
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'run_testcase'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\EskyTests\w_Logger.py", line 19, in <module>
    logger.run_logger()
  File "C:\EskyTests\w_Logger.py", line 14, in run_logger
    self.run_testcase()
  File "C:\EskyTests\w_Decorators.py", line 14, in wrapper
    PythonHelper.run_in_parallel([function,args],[recorder.capture])
  File "C:\EskyTests\w_PythonHelper.py", line 25, in run_in_parallel
    process.start()
  File "C:\Interpreters\Python32\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "C:\Interpreters\Python32\lib\multiprocessing\forking.py", line 267, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Interpreters\Python32\lib\multiprocessing\forking.py", line 190, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Interpreters\Python32\lib\pickle.py", line 237, in dump
    self.save(obj)
  File "C:\Interpreters\Python32\lib\pickle.py", line 344, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Interpreters\Python32\lib\pickle.py", line 432, in save_reduce
    save(state)
  File "C:\Interpreters\Python32\lib\pickle.py", line 299, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Interpreters\Python32\lib\pickle.py", line 623, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Interpreters\Python32\lib\pickle.py", line 656, in _batch_setitems
    save(v)
  File "C:\Interpreters\Python32\lib\pickle.py", line 299, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Interpreters\Python32\lib\pickle.py", line 683, in save_global
    (obj, module, name))
_pickle.PicklingError: Can't pickle <function run_testcase at 0x00000000027725C8>: it's not found as __main__.run_testcase
EN

回答 1

Stack Overflow用户

发布于 2012-04-29 09:17:42

multiprocessing模块通过调用从进程上的pickler来“调用”从进程中的函数。这是因为它必须通过它创建的IPC接口向从进程发送函数的名称。pickler计算出要使用的正确名称,并将其发送出去,然后在另一边,unpickler将名称转换回函数。

当函数是类成员时,如果没有帮助,就无法正确地对其进行腌制。这对@staticmethod成员来说更糟,因为他们的类型是function,而不是instancemethod类型,这会愚弄pickler。无需使用multiprocessing,就可以很容易地看到这一点。

代码语言:javascript
复制
import pickle

class Klass(object):
    @staticmethod
    def func():
        print 'func()'
    def __init__(self):
        print 'Klass()'

obj = Klass()
obj.func()
print pickle.dumps(obj.func)

生产:

代码语言:javascript
复制
Klass()
func()
Traceback (most recent call last):
 ...
pickle.PicklingError: Can't pickle <function func at 0x8017e17d0>: it's not found as __main__.func

当您尝试挑选一个常规的、非静态的方法(如obj.__init__ )时,问题就更明显了,因为pickler随后意识到它确实是一个实例方法:

代码语言:javascript
复制
TypeError: can't pickle instancemethod objects

然而,并不是所有的东西都丢失了。你只需要增加一个间接的级别。您可以提供一个普通函数,该函数在目标进程中创建实例绑定,至少发送两个参数:(泡沫化)类实例和函数的名称。我还添加了为完整性而调用函数时使用的任何参数。然后在目标进程中调用这个普通函数,并调用类的成员函数:

代码语言:javascript
复制
def call_name(instance, name, *args = (), **kwargs = None):
    "helper function for multiprocessing: call instance.getattr(name)"
    if kwargs is None:
        kwargs = {}
    getattr(instance, name)(*args, **kwargs)

现在不是(这是从你链接的帖子中复制的):

代码语言:javascript
复制
PythonHelper.run_in_parallel([self.run_testcase],[recorder.capture])

您可能会这样做(您可能会在调用序列上大惊小怪):

代码语言:javascript
复制
PythonHelper.run_in_parallel([call_name, (self, 'run_testcase')],
    [recorder.capture])

(注:这都是未经测试的,可能有各种错误)。

更新

我拿了你贴出来的新代码试了一试。

首先,我必须修复z_report.py中的缩进(去缩进所有class Report)。

一旦完成,运行它就会产生一个与您显示的错误完全不同的错误:

代码语言:javascript
复制
Process ExtProcess-1:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/tmp/t/marcin/z_helper.py", line 9, in run
    Process.run(self)
  File "/usr/local/lib/python2.7/multiprocessing/process.py", line 114, in run
recording
[infinite spew of "recording" messages]

要修复没完没了的“录制”消息:

代码语言:javascript
复制
diff --git a/z_recorder.py b/z_recorder.py
index 6163a87..a482268 100644
--- a/z_recorder.py
+++ b/z_recorder.py
@@ -1,4 +1,6 @@
+import time
 class Recorder(object):
     def capture(self):
-        while True:print("recording")
-
+        while True:
+            print("recording")
+            time.sleep(5)

这就留下了剩下的一个问题:给print_x的错误参数

代码语言:javascript
复制
TypeError: print_x() takes exactly 2 arguments (1 given)

在这一点上,Python实际上为您做了所有正确的事情,只是z_wrapper.wrapper有点过于热情了:

代码语言:javascript
复制
diff --git a/z_wrapper.py b/z_wrapper.py
index a0c32bf..abb1299 100644
--- a/z_wrapper.py
+++ b/z_wrapper.py
@@ -1,7 +1,7 @@
 from z_helper import PythonHelper
 from z_recorder import Recorder

-def wrapper(fname,*args):
+def wrapper(fname,args):
     try:
         PythonHelper.run_in_parallel([fname,args],[Recorder().capture])
         print("success")

这里的问题是,当您到达z_wrapper.wrapper时,函数参数都已打包成一个元组。z_report.Report.debug已经有:

代码语言:javascript
复制
    def function(*args):

因此,这两个参数(在本例中为main.Test实例和值10 )已被设置为元组。您只希望z_wrapper.wrapper将(单个)元组传递给PythonHelper.run_in_parallel,以提供参数。如果您添加了另一个元组,那么元组将被包装到另一个元组中(这次是一个元素)。(您可以通过在print "args:", args中添加z_wrapper.wrapper来看到这一点。)

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/10370705

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档