首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >IPython群集和PicklingError

IPython群集和PicklingError
EN

Stack Overflow用户
提问于 2013-12-21 09:18:47
回答 1查看 320关注 0票数 0

我的问题似乎与This Thread类似,但是,虽然我认为我遵循了建议的方法,但我仍然得到了一个PicklingError。当我在没有发送到IPython集群引擎的情况下在本地运行我的进程时,该函数运行良好。

我在IPyhon的笔记本上使用zipline,所以我首先创建了一个基于zipline.TradingAlgorithm的类

单元格1

代码语言:javascript
复制
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()

单元格2

代码语言:javascript
复制
%%px --local  # This insures that the Class and modules exist on each engine
import zipline as zpl
import numpy as np

class Agent(zpl.TradingAlgorithm):  # must define initialize and handle_data methods
    def initialize(self):
        self.valueHistory = None
        pass

    def handle_data(self, data):
        for security in data.keys():
            ## Just randomly buy/sell/hold for each security
            coinflip = np.random.random()
            if coinflip < .25:
                self.order(security,100)
            elif coinflip > .75:
                self.order(security,-100)
        pass

单元格3

代码语言:javascript
复制
from zipline.utils.factory import load_from_yahoo

start = '2013-04-01'
end   = '2013-06-01'
sidList = ['SPY','GOOG']
data = load_from_yahoo(stocks=sidList,start=start,end=end)

agentList = []
for i in range(3):
    agentList.append(Agent())

def testSystem(agent,data):
    results = agent.run(data)  #-- This is how the zipline based class is executed
    #-- next I'm just storing the final value of the test so I can plot later
    agent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1])
    return agent

for i in range(10):
    tasks = []
    for agent in agentList:
        #agent = testSystem(agent,data)  ## On its own, this works!
        #-- To Test, uncomment the above line and comment out the next two 
        tasks.append(lview.apply_async(testSystem,agent,data))
    agentList = [ar.get() for ar in tasks]

for agent in agentList:
    plot(agent.valueHistory)

下面是产生的错误:

代码语言:javascript
复制
PicklingError                             Traceback (most recent call last)/Library/Python/2.7/site-packages/IPython/kernel/zmq/serialize.pyc in serialize_object(obj, buffer_threshold, item_threshold)
    100         buffers.extend(_extract_buffers(cobj, buffer_threshold))
    101 
--> 102     buffers.insert(0, pickle.dumps(cobj,-1))
    103     return buffers
    104 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

如果我用如下代码覆盖zipline.TradingAlgorithm中的run()方法:

代码语言:javascript
复制
def run(self, data):
    return 1

尝试这样的事情..。

代码语言:javascript
复制
def run(self, data):
    return zpl.TradingAlgorithm.run(self,data)

产生相同的PicklingError。

然后传递到引擎的工作,但显然内部的测试没有执行。由于run是zipline.TradingAlgorithm内部的一个方法,而我并不知道它所做的一切,我如何确保它被传递?

EN

回答 1

Stack Overflow用户

发布于 2013-12-24 07:07:26

看起来zipline TradingAlgorithm对象在运行后是不可拾取的:

代码语言:javascript
复制
import zipline as zpl

class Agent(zpl.TradingAlgorithm):  # must define initialize and handle_data methods
    def handle_data(self, data):
        pass

agent = Agent()
pickle.dumps(agent)[:32] # ok

agent.run(data)
pickle.dumps(agent)[:32] # fails

但对我来说,这表明你应该在引擎上创建代理,并且只来回传递数据/结果(理想情况下,根本不传递数据,或者最多传递一次)。

最小化数据传输可能如下所示:

定义类:

代码语言:javascript
复制
%%px
import zipline as zpl
import numpy as np

class Agent(zpl.TradingAlgorithm):  # must define initialize and handle_data methods
    def initialize(self):
        self.valueHistory = []

    def handle_data(self, data):
        for security in data.keys():
            ## Just randomly buy/sell/hold for each security
            coinflip = np.random.random()
            if coinflip < .25:
                self.order(security,100)
            elif coinflip > .75:
                self.order(security,-100)

加载数据

代码语言:javascript
复制
%%px
from zipline.utils.factory import load_from_yahoo

start = '2013-04-01'
end   = '2013-06-01'
sidList = ['SPY','GOOG']

data = load_from_yahoo(stocks=sidList,start=start,end=end)
agent = Agent()

并运行以下代码:

代码语言:javascript
复制
def testSystem(agent, data):
    results = agent.run(data)  #-- This is how the zipline based class is executed
    #-- next I'm just storing the final value of the test so I can plot later
    agent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1])

# create references to the remote agent / data objects
agent_ref = parallel.Reference('agent')
data_ref =  parallel.Reference('data')

tasks = []
for i in range(10):
    for j in range(len(rc)):
        tasks.append(lview.apply_async(testSystem, agent_ref, data_ref))
# wait for the tasks to complete
[ t.get() for t in tasks ]

并绘制结果,而不是自己去获取代理

代码语言:javascript
复制
%matplotlib inline
import matplotlib.pyplot as plt

for history in rc[:].apply_async(lambda : agent.valueHistory):
    plt.plot(history)

我对zipline的了解还不够,不知道它对你是否有用。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/20714392

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档