首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >已在python中的异步客户端测试中使用的地址

已在python中的异步客户端测试中使用的地址
EN

Stack Overflow用户
提问于 2020-11-06 11:42:46
回答 1查看 91关注 0票数 0

我有一个关于异步客户端测试的问题。下面是我的测试代码

代码语言:javascript
复制
class TestSocketClient:
    @classmethod
    def setup_class(cls):
        # enable parallel testing by adding the pytest worker id number to the default port
        worker_id = os.environ.get("PYTEST_XDIST_WORKER", "gw0")
        worker_number = int(worker_id[2:])

        cls.mock_server = ServerMock()
        cls.mock_server.port = ServerMock.port + worker_number

        cls.username = os.environ.get("USERNAME", "")
        cls.password = os.environ.get("PASSWORD", "")
        cls.socket_client = SocketClient(username=cls.username, password=cls.password)
        cls.socket_client.hostname = "0.0.0.0"
        cls.socket_client.port = SocketClient.port + worker_number

    @pytest.mark.asyncio
    async def test_login(self):
        await self.mock_server.start()
        response = await self.socket_client.login()
        assert response == actual_response
        await self.socket_client.close()

    @pytest.mark.asyncio
    async def test_send_heartbeat(self):
        await self.mock_server.start()
        await self.socket_client.login()
        await self.socket_client.send_heartbeat()
        await self.socket_client.close()

我可以在TestSocketClient下单独运行测试,它们将单独通过。但是,当我将测试套件与pytest -n auto一起运行时,后一个测试用例将引发error while attempting to bind on address ('0.0.0.0', 2056): address already in use。我的问题是,如何在不解决分配问题的情况下使测试套件通过,以便它们可以在CI过程中成功运行。如果在编写异步测试时有一些更有价值的建议,我将非常感激(例如,如果我只想测试客户端想要发送到服务器的请求,我应该断言什么?我应该断言在服务器端接收到的消息,还是只在客户端编写类似assert_called_once的代码)。提前感谢!

更新:

我最终在不同的测试中解决了端口增量的问题,如下所示

代码语言:javascript
复制
class TestSocketClient:
    ports_taken = set()

    @classmethod
    def setup_class(cls):

        cls.mock_server = ServerMock()

        cls.username = os.environ.get("USERNAME", "")
        cls.password = os.environ.get("PASSWORD", "")
        cls.socket_client = SocketClient(username=cls.username, password=cls.password)
        cls.socket_client.hostname = "0.0.0.0"
        cls.socket_client.port = cls.mock_server.port
     
     def bump(self):
        if len(self.ports_taken) == 0:
           self.ports_taken.add(self.mock_server.port)
        new_port = max(self.ports_taken) + 1
        self.mock_server.port = new_port
        self.socket_client.port = new_port
        self.ports_taken.add(self.mock_server.port)
     
     async def start(self):
        self.bump()
        try:
           await self.mock_server.start()
        except:
           self.bump()
           await self.mock_server.start()
     
     @pytest.mark.asyncio
     async def test_login(self):
         await self.start()
         ...

希望这能对你有所帮助!

EN

回答 1

Stack Overflow用户

发布于 2020-11-06 15:37:49

对于address already in use,通过以下方式检查当前正在运行的进程

代码语言:javascript
复制
ps -ax | grep <your app/app port>

在这里,您将注意到该进程已经存在。请给我。使用sudo kill -9 <process_id>终止该进程

现在重新启动您的服务。我对异步测试不是很确定。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64708547

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档