我很难在多处理环境中使用mock.patch,而没有多处理的mock.patch工作得很好。文件名:test_mp.py
import multiprocessing
import mock
def inner():
return sub()
def sub():
return "abc"
def test_local():
assert inner()=="abc"
def test_mp():
with multiprocessing.Pool() as pool:
assert pool.apply(inner,args=[])=='abc'
def test_mock():
with mock.patch('test_mp.sub', return_value='xxx') as xx:
assert inner()=="xxx"
xx.assert_called_once()
def test_mp_mock():
with multiprocessing.Pool() as pool:
with mock.patch('test_mp.sub', return_value='xyz') as xx:
assert pool.apply(inner,args=[])=='xyz'
xx.assert_called_once()test_local,test_mock和test_mock完成了的test_mp_mock测试失败
=================================== FAILURES ===================================
_________________________________ test_mp_mock _________________________________
def test_mp_mock():
with multiprocessing.Pool() as pool:
with mock.patch('test_mp.sub', return_value='xyz') as xx:
> assert pool.apply(inner,args=[])=='xyz'
E AssertionError: assert 'abc' == 'xyz'
E - abc
E + xyz
projects/mfhealth/test_mp.py:25: AssertionError更新:
在https://medium.com/uckey/how-mock-patch-decorator-works-in-python-37acd8b78ae的基础上,在sharedmock https://github.com/elritsch/python-sharedmock的帮助下,我能够更进一步,但仍未完成。
我将test_mp.py扩展为
from sharedmock.mock import SharedMock
def inner2(sm):
with mock.patch('test_mp.sub', sm) as xx:
return inner()
def test_mp_smock():
with multiprocessing.Pool() as pool:
sm=SharedMock()
sm.return_value="xyz"
with mock.patch('test_mp.sub', sm) as xx:
assert pool.apply(inner2,args=[sm])=='xyz'
assert xx.call_count == 1
def test_mp_mock2():
with multiprocessing.Pool() as pool:
sm=mock.Mock()
sm.return_value="xyz"
print(f"before patch {sub}, {locals()}")
with mock.patch('test_mp.sub', sm) as xx:
print(f"after patch {sub}")
assert pool.apply(inner2,args=[sm])=='xyz'
assert xx.call_count == 1取得了以下结果:
test_mp_smock用_pickle.PicklingError: Can't pickle <class 'mock.mock.Mock'>: it's not the same object as mock.mock.Mock完成successfully.
test_mp_mock2失败test_mp_smock的主要缺点是必须引入一种新的方法inner2来通过mock.patch激活补丁。任何关于如何将补丁从test_mp_smock传播到正在测试的代码而不引入包装器方法inner2的想法,因为我不能覆盖。
pool.apply(inner,args=[])发布于 2020-12-29 22:20:45
终于,我让它起作用了。
SharedMock并不像Mock那样灵活。缺少assert_called_once_with,.,但是可以设置返回的值或检查调用的次数或其参数。
import multiprocessing
import mock
from sharedmock.mock import SharedMock
def inner():
return sub(x="xxx")
def sub(x=""):
return f"abc"
def fun_under_test():
with multiprocessing.Pool() as pool:
assert pool.apply(inner,args=[])=='xyz'
def test_final():
sm=SharedMock()
sm.return_value="xyz"
with mock.patch('test_mp.sub', sm) as xx:
fun_under_test()
assert xx.call_count == 1 #number of calls of sub function
assert xx.mock_calls[0][2]['x']=="xxx" # value of parameters ie sub(x="xxx")https://stackoverflow.com/questions/65492977
复制相似问题