我有一个大对象,其类型不能在进程之间共享。它具有实例化它和处理它的数据的方法。
当前的方法是首先在主父进程中实例化对象,然后在某些事件发生时将其传递给子进程。问题是,每当子进程运行时,每次都会在内存中复制对象,这需要一段时间。我想将它存储在只有他们可用的内存中,这样他们就不必每次调用对象的函数时都复制它。
我如何存储一个对象,仅供该过程自己使用?
编辑:代码
class MultiQ:
def __init__(self):
self.pred = instantiate_predict() #here I instantiate the big object
def enq_essay(self,essay):
p = Process(target=self.compute_results, args=(essay,))
p.start()
def compute_results(self, essay):
predictions = self.pred.predict_fields(essay) #computation in the large object that doesn't modify the object这每次都会在内存中复制大对象。我在努力避免这种情况。
编辑4:在20个新闻组数据上运行的简短代码示例
import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging
import os
import numpy as np
import cPickle as pickle
def get_20newsgroups_fnames():
all_files = []
for i, (root, dirs, files) in enumerate(os.walk("/home/roman/Desktop/20_newsgroups/")):
if i>0:
all_files.extend([os.path.join(root,file) for file in files])
return all_files
documents = [unicode(open(f).read(), errors="ignore") for f in get_20newsgroups_fnames()]
logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
datefmt = '%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True
def free_memory():
"""
Return free memory available, including buffer and cached memory
"""
total = 0
with open('/proc/meminfo', 'r') as f:
for line in f:
line = line.strip()
if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
field, amount, unit = line.split()
amount = int(amount)
if unit != 'kB':
raise ValueError(
'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
total += amount
return total
def predict(large_object, essay="this essay will be predicted"):
"""this method copies the large object in memory which is what im trying to avoid"""
vectorized_essay = large_object[0].transform(essay)
large_object[1].predict(vectorized_essay)
report_memory("done")
def train_and_model():
"""this is very similar to the instantiate_predict method from my first code sample"""
tfidf_vect = ftext.TfidfVectorizer()
X = tfidf_vect.fit_transform(documents)
y = np.random.random_integers(0,1,19997)
model = lm.LogisticRegression()
model.fit(X, y)
return (tfidf_vect, model)
def report_memory(label):
f = free_memory()
logger.warn('{l:<25}: {f}'.format(f=f, l=label))
def dump_large_object(large_object):
f = open("large_object.obj", "w")
pickle.dump(large_object, f, protocol=2)
f.close()
def load_large_object():
f = open("large_object.obj")
large_object = pickle.load(f)
f.close()
return large_object
if __name__ == '__main__':
report_memory('Initial')
tfidf_vect, model = train_and_model()
report_memory('After train_and_model')
large_object = (tfidf_vect, model)
procs = [mp.Process(target=predict, args=(large_object,))
for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
p.start()
report_memory('After p.start')
for p in procs:
p.join()
report_memory('After p.join')输出1:
19:01:39: [ MainProcess] Initial : 26585728
19:01:51: [ MainProcess] After train_and_model : 25958924
19:01:51: [ MainProcess] After Process : 25958924
19:01:51: [ MainProcess] After p.start : 25925908
19:01:51: [ Process-1] done : 25725524
19:01:51: [ Process-2] done : 25781076
19:01:51: [ Process-4] done : 25789880
19:01:51: [ Process-3] done : 25802032
19:01:51: [ MainProcess] After p.join : 25958272
roman@ubx64:$ du -h large_object.obj
4.6M large_object.obj因此,可能大对象甚至不是很大,我的问题是来自tfidf向量器的转换方法的内存使用问题。
现在,如果我将主要方法改为:
report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
procs = [mp.Process(target=predict, args=(large_object,))
for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
p.start()
report_memory('After p.start')
for p in procs:
p.join()
report_memory('After p.join')我得到以下结果:输出2:
20:07:23: [ MainProcess] Initial : 26578356
20:07:23: [ MainProcess] After loading the object : 26544380
20:07:23: [ MainProcess] After Process : 26544380
20:07:23: [ MainProcess] After p.start : 26523268
20:07:24: [ Process-1] done : 26338012
20:07:24: [ Process-4] done : 26337268
20:07:24: [ Process-3] done : 26439444
20:07:24: [ Process-2] done : 26438948
20:07:24: [ MainProcess] After p.join : 26542860然后,我将主要方法改为:
report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
predict(large_object)
report_memory('After Process')得到了如下结果:输出3:
20:13:34: [ MainProcess] Initial : 26572580
20:13:35: [ MainProcess] After loading the object : 26538356
20:13:35: [ MainProcess] done : 26513804
20:13:35: [ MainProcess] After Process : 26513804此时,我不知道发生了什么,但是多处理肯定会使用更多的内存。
发布于 2013-01-21 19:42:17
Linux使用抄写,这意味着当子进程分叉时,每个子进程中的全局变量共享相同的内存地址,直到值被修改为止。只有当一个值被修改时,它才会被复制。
因此,在理论上,如果不修改大型对象,则子进程可以使用它而不消耗更多的内存。我们来验证一下这个理论吧。
下面是您的代码,用一些内存使用日志进行修饰:
import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging
logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
datefmt='%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True
def predict(essay="this essay will be predicted"):
"""this method copies the large object in memory which is what im trying to avoid"""
vectorized_essay = large_object[0].transform(essay)
large_object[1].predict(vectorized_essay)
report_memory("done")
def train_and_model():
"""this is very similar to the instantiate_predict method from my first code sample"""
tfidf_vect = ftext.TfidfVectorizer()
N = 100000
corpus = [
'This is the first document.',
'This is the second second document.',
'And the third one.',
'Is this the first document?', ] * N
y = [1, 0, 1, 0] * N
report_memory('Before fit_transform')
X = tfidf_vect.fit_transform(corpus)
model = lm.LogisticRegression()
model.fit(X, y)
report_memory('After model.fit')
return (tfidf_vect, model)
def free_memory():
"""
Return free memory available, including buffer and cached memory
"""
total = 0
with open('/proc/meminfo', 'r') as f:
for line in f:
line = line.strip()
if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
field, amount, unit = line.split()
amount = int(amount)
if unit != 'kB':
raise ValueError(
'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
total += amount
return total
def gen_change_in_memory():
f = free_memory()
diff = 0
while True:
yield diff
f2 = free_memory()
diff = f - f2
f = f2
change_in_memory = gen_change_in_memory().next
def report_memory(label):
logger.warn('{l:<25}: {d:+d}'.format(d=change_in_memory(), l=label))
if __name__ == '__main__':
report_memory('Initial')
tfidf_vect, model = train_and_model()
report_memory('After train_and_model')
large_object = (tfidf_vect, model)
procs = [mp.Process(target=predict) for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
p.start()
for p in procs:
p.join()
report_memory('After p.join')它产生:
21:45:01: [ MainProcess] Initial : +0
21:45:01: [ MainProcess] Before fit_transform : +3224
21:45:12: [ MainProcess] After model.fit : +153572
21:45:12: [ MainProcess] After train_and_model : -3100
21:45:12: [ MainProcess] After Process : +0
21:45:12: [ Process-1] done : +2232
21:45:12: [ Process-2] done : +2976
21:45:12: [ Process-3] done : +3596
21:45:12: [ Process-4] done : +3224
21:45:12: [ MainProcess] After p.join : -372报告的数字是KiB中空闲内存(包括缓存和缓冲区)的变化。因此,例如,“初始”和“train_and_model之后”之间的空闲内存变化约为150 So。因此,large_object大约需要150 the。
然后,在完成4个子进程之后,消耗的内存要少得多--大约12 of的内存。所消耗的内存可能是由于创建了子进程加上transform和predict方法使用的内存。
因此,large_object似乎没有被复制,因为如果是的话,我们应该看到消耗的内存增加了大约150 in。
关于您在20个新闻组上运行的评论
以下是空闲内存中的变化:
关于20个新闻组的数据:
| Initial | 0 |
| After train_and_model | 626804 | <-- Large object requires 627M
| After Process | 0 |
| After p.start | 33016 |
| done | 200384 |
| done | -55552 |
| done | -8804 |
| done | -12152 |
| After p.join | -156240 |因此,它看起来像实例化大型对象需要627 So。我不知道为什么在到达第一个200+MB之后会消耗额外的done。
使用load_large_object:
| Initial | 0 |
| After loading the object | 33976 |
| After Process | 0 |
| After p.start | 21112 |
| done | 185256 |
| done | 744 |
| done | -102176 |
| done | 496 |
| After p.join | -103912 |显然,large_object本身只需要34 in,其余的内存,627-34 =593 in必须已被在train_and_model中调用的fit_transform和fit方法使用。
使用单一进程:
| Initial | 0 |
| After loading the object | 34224 |
| done | 24552 |
| After Process | 0 |这是有道理的。
因此,您积累的数据似乎支持这样的说法,即大型对象本身没有被每个子进程复制。但是一个新的谜团出现了:为什么在“p.start之后”和第一个“完成”之间存在巨大的内存消耗。我不知道答案。
您可以尝试将report_memory调用转到
vectorized_essay = large_object[0].transform(essay)和
large_object[1].predict(vectorized_essay)看看多余的内存是在哪里消耗的。我猜这些科学学习方法之一就是选择分配这个(相对)巨大的内存。
发布于 2013-02-15 02:01:12
最后,我使用了兔子MQ.兔MQ RPC/Python教程的RPC服务器。所以我在我的机器上创建了相当于CPU数量的服务器数量。这些服务器只启动一次,并为模型和向量程序分配内存一次,并在运行时保留它。其他优势是
总的来说,我的代码也变得更干净了。
https://stackoverflow.com/questions/14437944
复制相似问题