我试图在多处理场景中使用python的默认日志模块。我读过:
以及关于多处理、日志记录、python类等的其他多篇文章。在阅读了这么多之后,我看到了这段代码,我无法使用python正确运行它:
import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time
from logutils.queue import QueueListener, QueueHandler
class Worker(Process):
def __init__(self, n, q):
super(Worker, self).__init__()
self.n = n
self.queue = q
self.qh = QueueHandler(self.queue)
self.root = logging.getLogger()
self.root.addHandler(self.qh)
self.root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
def listener_process(queue):
while True:
try:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except (KeyboardInterrupt, SystemExit):
raise
except:
import sys, traceback
print >> sys.stderr, 'Whoops! Problem:'
traceback.print_exc(file=sys.stderr)
if __name__ == "__main__":
mpq = mpQueue(-1)
root = logging.getLogger()
h = logging.StreamHandler()
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
l = logging.getLogger("Test")
l.setLevel(logging.DEBUG)
listener = Process(target=listener_process,
args=(mpq,))
listener.start()
workers=[]
for i in xrange(1):
worker = Worker(i, mpq)
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
mpq.put_nowait(None)
listener.join()
for i in xrange(10):
l.info("testing %i"%i)
print "Finish"如果执行代码,输出将以某种方式重复如下所示的行:
2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2
2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8
2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9
2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0
2013-12-02 16:44:46,005 MainProcess Test INFO testing 0
2013-12-02 16:44:46,005 MainProcess Test INFO testing 1
2013-12-02 16:44:46,005 MainProcess Test INFO testing 2
2013-12-02 16:44:46,005 MainProcess Test INFO testing 3
2013-12-02 16:44:46,005 MainProcess Test INFO testing 4
2013-12-02 16:44:46,005 MainProcess Test INFO testing 5
2013-12-02 16:44:46,006 MainProcess Test INFO testing 6
2013-12-02 16:44:46,006 MainProcess Test INFO testing 7
2013-12-02 16:44:46,006 MainProcess Test INFO testing 8
2013-12-02 16:44:46,006 MainProcess Test INFO testing 9
Finish在其他问题中,建议添加处理程序不止一次,但是,正如您所看到的,我只在主方法中添加了一次流。我已经测试过将主方法嵌入到具有相同结果的类中。
编辑:正如@max建议的(或我相信他所说的),我已经将worker类的代码修改为:
class Worker(Process):
root = logging.getLogger()
qh = None
def __init__(self, n, q):
super(Worker, self).__init__()
self.n = n
self.queue = q
if not self.qh:
Worker.qh = QueueHandler(self.queue)
Worker.root.addHandler(self.qh)
Worker.root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
print self.root.handlers
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)使用相同的结果,现在队列处理程序不再一次又一次地添加,但是仍然存在重复的日志条目,即使只有一个工作人员。
EDIT2:我对代码做了一点改动。我更改了侦听器进程,现在使用了一个QueueListener (这也是我在开始时想要的),将主代码移到一个类中。
import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time
from logutils.queue import QueueListener, QueueHandler
root = logging.getLogger()
added_qh = False
class Worker(Process):
def __init__(self, logconf, n, qh):
super(Worker, self).__init__()
self.n = n
self.logconf = logconf
# global root
global added_qh
if not added_qh:
added_qh = True
root.addHandler(qh)
root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
#print root.handlers
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
class Main(object):
def __init__(self):
pass
def start(self):
mpq = mpQueue(-1)
qh = QueueHandler(mpq)
h = logging.StreamHandler()
ql = QueueListener(mpq, h)
#h.setFormatter(f)
root.addHandler(qh)
l = logging.getLogger("Test")
l.setLevel(logging.DEBUG)
workers=[]
for i in xrange(15):
worker = Worker(logconf, i, qh)
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
print "joining worker: {}".format(worker)
worker.join()
mpq.put_nowait(None)
ql.start()
# listener.join()
for i in xrange(10):
l.info("testing %i"%i)
if __name__ == "__main__":
x = Main()
x.start()
time.sleep(10)
print "Finish"现在,主要是工作,直到我到达一定数量的工作人员(~15)时,由于某种原因,主类在do被阻塞,其余的工作人员什么也不做。
发布于 2016-05-12 08:08:19
我要迟到了,所以你可能不再需要答案了。问题来自这样一个事实:您已经在您的主进程中设置了一个处理程序,而在您的工作人员中您正在添加另一个处理程序。这意味着在辅助进程中,实际上有两个处理程序在管理您的数据,一个用于将日志推入队列,另一个用于写入流。
您可以简单地通过在代码中添加一个额外的行self.root.handlers = []来解决这个问题。从原始代码来看,工作人员的__init__方法如下所示:
def __init__(self, n, q):
super(Worker, self).__init__()
self.n = n
self.queue = q
self.qh = QueueHandler(self.queue)
self.root = logging.getLogger()
self.root.handlers = []
self.root.addHandler(self.qh)
self.root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)输出现在如下所示:
python workers.py
2016-05-12 10:07:02,971 Worker-2 W0 INFO Worker 0 Starting
2016-05-12 10:07:02,972 Worker-2 W0 INFO testing 0
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 1
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 2
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 3
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 4
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 5
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 6
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 7
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 8
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 9
2016-05-12 10:07:02,973 Worker-2 W0 INFO Completed 0
Finish发布于 2016-09-13 16:51:41
我想出了一个很简单的解决方法。它可能不健壮,而且我也不是日志模块方面的专家,但它似乎是解决我的情况的最佳解决方案。在尝试了一些代码更改(允许从multiprocess.get_logger()传入现有的记录器)之后,我不喜欢代码发生了多大的变化,并想出了一个简单易懂的简单方法(如果我一开始就这么做的话):
(工作示例,使用多处理池完成)
import logging
import multiprocessing
class FakeLogger(object):
def __init__(self, q):
self.q = q
def info(self, item):
self.q.put('INFO - {}'.format(item))
def debug(self, item):
self.q.put('DEBUG - {}'.format(item))
def critical(self, item):
self.q.put('CRITICAL - {}'.format(item))
def warning(self, item):
self.q.put('WARNING - {}'.format(item))
def some_other_func_that_gets_logger_and_logs(num):
# notice the name get's discarded
# of course you can easily add this to your FakeLogger class
local_logger = logging.getLogger('local')
local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
local_logger.debug('hmm, something may need debugging here')
return num*2
def func_to_parallelize(data_chunk):
# unpack our args
the_num, logger_q = data_chunk
# since we're now in a new process, let's monkeypatch the logging module
logging.getLogger = lambda name=None: FakeLogger(logger_q)
# now do the actual work that happens to log stuff too
new_num = some_other_func_that_gets_logger_and_logs(the_num)
return (the_num, new_num)
if __name__ == '__main__':
multiprocessing.freeze_support()
m = multiprocessing.Manager()
logger_q = m.Queue()
# we have to pass our data to be parallel-processed
# we also need to pass the Queue object so we can retrieve the logs
parallelable_data = [(1, logger_q), (2, logger_q)]
# set up a pool of processes so we can take advantage of multiple CPU cores
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
worker_output = pool.map(func_to_parallelize, parallelable_data)
pool.close() # no more tasks
pool.join() # wrap up current tasks
# get the contents of our FakeLogger object
while not logger_q.empty():
print logger_q.get()
print 'worker output contained: {}'.format(worker_output)当然,这可能不会涵盖logging使用的全部内容,但我认为这个概念非常简单,可以快速、相对轻松地工作。而且修改应该很容易(例如lambda丢弃了一个可以传递到getLogger的可能的前缀)。
发布于 2013-12-02 15:59:34
所有Worker都共享同一个根记录器对象(在Worker.__init__中获得-- getLogger调用总是返回相同的记录器)。但是,每次创建Worker时,都会向该记录器添加一个处理程序(QueueHandler)。
因此,如果创建10个Worker,则根记录器上将有10个(相同的)处理程序,这意味着输出将重复10次。
相反,您应该使记录器成为一个模块属性,而不是一个实例属性,并在模块级别配置它一次--而不是在类级别。
(实际上,记录器应该在程序级别配置一次)
https://stackoverflow.com/questions/20332359
复制相似问题