首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >具有两个协同线的asyncIO多线程服务器

具有两个协同线的asyncIO多线程服务器
EN

Stack Overflow用户
提问于 2018-07-26 21:52:55
回答 1查看 783关注 0票数 2

我正在用Python3编写一个服务器,它会截图并通过websockets发送。我有处理连接的协同线,我想创建另一个用于在某个时间间隔拍摄屏幕快照的协同线。截图协同器可能会在不同的线程中运行,我需要将结果传播到具有读写锁的共享变量中,这样才能发送它。我的问题是:(如果可能的话,结果应该是多平台的)

  1. 怎么可能安排这样的任务呢?我创建了一个永远运行的服务器,我可以创建周期性的协同,但不知怎么我不能把它们放在一个循环中。
  2. 从一个线程(如果服务器是单线程)向另一个线程传播结果的好方法是什么?

我发现这段代码与此类似,我无法让它工作(第二个协同器不执行)。有人可以纠正这与不多线程吗?

代码语言:javascript
复制
async def print_var():
    global number
    await asyncio.sleep(2)
    print(number)


async def inc_var():
    global number
    await asyncio.sleep(5)
    number += 1

number = 0

asyncio.get_event_loop().run_until_complete(print_var())
asyncio.async(inc_var)
asyncio.get_event_loop().run_forever()

答疑编辑

最后,经过更多小时的谷歌搜索,我实际上让它在一个线程上工作,所以没有种族状况的危险。(但我仍然不知道ensure_future做了什么,为什么没有在事件循环中调用它。)

代码语言:javascript
复制
users = set()

def register(websocket):
    users.add(websocket)

def unregister(websocket):
    users.remove(websocket)

async def get_screenshot():
    global screenshot
    while True:
        screenshot = screenshot()
        await asyncio.sleep(0.2)

async def server(websocket, path):
    global screenshot
    register(websocket)
    try:
        async for message in websocket:
            respond(screenshot)
    finally:
        unregister(websocket)

def main():
    asyncio.get_event_loop().run_until_complete(
        websockets.serve(server, 'localhost', 6789))
    asyncio.ensure_future(get_screenshot())
    asyncio.get_event_loop().run_forever()

main()
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-07-27 11:56:42

在Python 3.7中:

代码语言:javascript
复制
import asyncio

import websockets

CAPTURE_INTERVAL = 1
running = True
queues = set()


async def handle(ws, path):
    queue = asyncio.Queue()
    queues.add(queue)
    while running:
        data = await queue.get()
        if not data:
            break
        await ws.send(data)


def capture_screen():
    # Do some work here, preferably in C extension without holding the GIL
    return b'screenshot data'


async def main():
    global running
    loop = asyncio.get_running_loop()
    server = await websockets.serve(handle, 'localhost', 8765)
    try:
        while running:
            data = await loop.run_in_executor(None, capture_screen)
            for queue in queues:
                queue.put_nowait(data)
            await asyncio.sleep(CAPTURE_INTERVAL)
    finally:
        running = False
        for queue in queues:
            queue.put_nowait(None)
        server.close()
        await server.wait_closed()


if __name__ == '__main__':
    asyncio.run(main())

请注意,这只是为了演示生产者-消费者的粉丝模式.queues并不是必需的--您可以直接将data发送到main()中的所有server.sockets,而在handle()中,您应该担心传入的websocket消息。例如,客户端可以像这样控制图像压缩速率:

代码语言:javascript
复制
import asyncio

import websockets

CAPTURE_INTERVAL = 1
DEFAULT = b'default'
qualities = {}


async def handle(ws, path):
    try:
        async for req in ws:
            qualities[ws] = req
    finally:
        qualities.pop(ws, None)


def capture_screen():
    # Do some work here, preferably in C extension without holding the GIL
    return {
        DEFAULT: b'default screenshot data',
        b'60': b'data at 60% quality',
        b'80': b'data at 80% quality',
    }


async def main():
    loop = asyncio.get_running_loop()
    server = await websockets.serve(handle, 'localhost', 8765)
    try:
        while True:
            data = await loop.run_in_executor(None, capture_screen)
            for ws in server.sockets:
                quality = qualities.get(ws, DEFAULT)
                if quality not in data:
                    quality = DEFAULT
                asyncio.create_task(ws.send(data[quality]))
            await asyncio.sleep(CAPTURE_INTERVAL)
    finally:
        server.close()
        await server.wait_closed()


if __name__ == '__main__':
    asyncio.run(main())
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51548025

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档