我正在用log_cli=true运行测试。脚本:
import logging
import sys
from multiprocessing import Process
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger("leapp.actors.quagga_report")
class ActorContext:
def __init__(self):
self.log = logger
def run(self):
self.log.debug("Some msg")
current_actor_context = ActorContext()
def test_caplog_fails(caplog):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
p = Process(target=current_actor_context.run)
p.start()
p.join()
assert "Some msg" in caplog.text
def test_caplog_passes(caplog):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
current_actor_context.run()
assert "Some msg" in caplog.textpytest log_cli在两个测试中都显示了日志消息,但是,caplog只在第二个测试中看到了这些消息。
第一次测试失败,并显示以下回溯:
-------------------------------- live log call ---------------------------------
| 13:39:20 | 40212 | leapp.actors.quagga_report | DEBUG | test_logger_caplog_fails.py | Some msg
FAILED
tests/test_logger_caplog_fails.py:20 (test_caplog_fails)
Traceback (most recent call last):
File "/home/azhukov/Dropbox/code/lighting_talks/asyncio_subprocess_shells/tests/test_logger_caplog_fails.py", line 26, in test_caplog_fails
assert "Some msg" in caplog.text
AssertionError: assert 'Some msg' in ''
+ where '' = <_pytest.logging.LogCaptureFixture object at 0x7fb8a87f2370>.text我正在寻找一个类似的问题Pytest capture not working - caplog and capsys are empty,但是在我的例子中是属性propagate=True
发布于 2020-07-23 20:59:16
受到@hoefling的启发,并且仍然愿意使用caplog,这里有一个解决方案。其思想是创建一个fixture,它从QueueHandler处理程序获取队列,并在主进程中重新发出日志,该进程可由caplog捕获
import logging
import sys
from contextlib import contextmanager
from logging import handlers
from multiprocessing import Process, Queue
import pytest
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger(__name__)
class ActorContext:
def __init__(self):
self.log = logger
def run(self):
self.log.debug("Some msg")
current_actor_context = ActorContext()
@pytest.fixture()
def caplog_workaround():
@contextmanager
def ctx():
logger_queue = Queue()
logger = logging.getLogger()
logger.addHandler(handlers.QueueHandler(logger_queue))
yield
while not logger_queue.empty():
log_record: logging.LogRecord = logger_queue.get()
logger._log(
level=log_record.levelno,
msg=log_record.message,
args=log_record.args,
exc_info=log_record.exc_info,
)
return ctx
def test_caplog_already_not_fails(caplog, caplog_workaround):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
with caplog_workaround():
p = Process(target=current_actor_context.run)
p.start()
p.join()
assert "Some msg" in caplog.text
def test_caplog_passes(caplog, capsys):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
current_actor_context.run()
assert "Some msg" in caplog.texthttps://stackoverflow.com/questions/63052171
复制相似问题