艾奥赫特已经内置了对websockets的支持。它很简单,工作也很好。
文档中示例的一个简化版本是:
async def handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# Async iterate the messages the client sends
async for message in ws:
ws.send_str('You sent: %s' % (message.data,))
print('websocket connection closed')在本例中,ws是对与客户端的websocket连接的引用。我可以很容易地将这些引用放入request.app中,比如@Crandel在这里做 (即全局状态),但不能放在生产应用程序中,因为每个应用服务器(甚至每个工作人员)都会有自己的app实例。
这有一个公认的模式吗?还有别的办法吗?
注:我指的不是会议。我指的是联系。当服务器B中的应用程序代码中发生事件时,我想向连接到服务器A的客户端发送一条消息。
发布于 2016-03-11 17:01:40
最新情况(2017年2月)
通道(幸运的是)没有合并到Django中。它可能仍然是一个伟大的项目,但它并不真正属于Django本身。
此外,我强烈建议看一看Postgres相对新的、内置的对酒吧/潜艇的支持。它将可能比其他任何事表现都要好,并在aiohttp之上构建一个自定义解决方案,使用Postgres作为支持服务,这可能是您的最佳选择。
原创
虽然Django频道不是aiohttp,但它很可能被合并到Django 1.10中,它以非常直观的方式解决了这个问题,它是由Django 迁徙的作者安德鲁·戈德温编写的。
Django通道通过在Django应用程序前面创建路由层来抽象“多服务器上的许多进程”的概念。该路由层使用后端(例如Redis)来维护进程之间的可共享状态,并使用新的阿斯吉协议来方便处理HTTP请求和WebSockets,同时将每个请求委托给各自的"消费者“(例如,附带一个内置的HTTP请求处理程序,您可以为WebSockets编写自己的消费者)。
Django通道有一个名为群组的概念,它处理问题的“广播”性质;也就是说,它允许发生在服务器上的事件触发消息到该组中的客户端,而不管它们是连接到相同的还是不同的进程或服务器。
IMHO,Django通道很可能被抽象到一个更通用的Python库中。有一个实现其他几个 Python库的围棋频道,但是,在本文撰写之时,没有什么值得注意的提供网络透明性的东西:通道在进程和服务器之间进行通信的能力。
发布于 2017-02-15 15:22:54
如果我对您的理解是正确的,您希望拥有多个websocket服务器,每个服务器都有多个客户端连接,但您希望能够与所有连接的客户端进行潜在的通信。
下面是一个示例,它创建了三个简单的服务器--大写回声、随机引用和一天中的时间--然后向所有连接的客户端发送广播消息。也许这里面有一些有用的想法。
巴斯丁:https://pastebin.com/xDSACmdV
#!/usr/bin/env python3
"""
Illustrates how to have multiple websocket servers running and send
messages to all their various clients at once.
In response to stackoverflow question:
https://stackoverflow.com/questions/35820782/how-to-manage-websockets-across-multiple-servers-workers
Pastebin: https://pastebin.com/xDSACmdV
"""
import asyncio
import datetime
import random
import time
import webbrowser
import aiohttp
from aiohttp import web
__author__ = "Robert Harder"
__email__ = "rob@iharder.net"
__license__ = "Public Domain"
def main():
# Create servers
cap_srv = CapitalizeEchoServer(port=9990)
rnd_srv = RandomQuoteServer(port=9991)
tim_srv = TimeOfDayServer(port=9992)
# Queue their start operation
loop = asyncio.get_event_loop()
loop.create_task(cap_srv.start())
loop.create_task(rnd_srv.start())
loop.create_task(tim_srv.start())
# Open web pages to test them
webtests = [9990, 9991, 9991, 9992, 9992]
for port in webtests:
url = "http://www.websocket.org/echo.html?location=ws://localhost:{}".format(port)
webbrowser.open(url)
print("Be sure to click 'Connect' on the webpages that just opened.")
# Queue a simulated broadcast-to-all message
def _alert_all(msg):
print("Sending alert:", msg)
msg_dict = {"alert": msg}
cap_srv.broadcast_message(msg_dict)
rnd_srv.broadcast_message(msg_dict)
tim_srv.broadcast_message(msg_dict)
loop.call_later(17, _alert_all, "ALL YOUR BASE ARE BELONG TO US")
# Run event loop
loop.run_forever()
class MyServer:
def __init__(self, port):
self.port = port # type: int
self.loop = None # type: asyncio.AbstractEventLoop
self.app = None # type: web.Application
self.srv = None # type: asyncio.base_events.Server
async def start(self):
self.loop = asyncio.get_event_loop()
self.app = web.Application()
self.app["websockets"] = [] # type: [web.WebSocketResponse]
self.app.router.add_get("/", self._websocket_handler)
await self.app.startup()
handler = self.app.make_handler()
self.srv = await asyncio.get_event_loop().create_server(handler, port=self.port)
print("{} listening on port {}".format(self.__class__.__name__, self.port))
async def close(self):
assert self.loop is asyncio.get_event_loop()
self.srv.close()
await self.srv.wait_closed()
for ws in self.app["websockets"]: # type: web.WebSocketResponse
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')
await self.app.shutdown()
await self.app.cleanup()
async def _websocket_handler(self, request):
assert self.loop is asyncio.get_event_loop()
ws = web.WebSocketResponse()
await ws.prepare(request)
self.app["websockets"].append(ws)
await self.do_websocket(ws)
self.app["websockets"].remove(ws)
return ws
async def do_websocket(self, ws: web.WebSocketResponse):
async for ws_msg in ws: # type: aiohttp.WSMessage
pass
def broadcast_message(self, msg: dict):
for ws in self.app["websockets"]: # type: web.WebSocketResponse
ws.send_json(msg)
class CapitalizeEchoServer(MyServer):
""" Echoes back to client whatever they sent, but capitalized. """
async def do_websocket(self, ws: web.WebSocketResponse):
async for ws_msg in ws: # type: aiohttp.WSMessage
cap = ws_msg.data.upper()
ws.send_str(cap)
class RandomQuoteServer(MyServer):
""" Sends a random quote to the client every so many seconds. """
QUOTES = ["Wherever you go, there you are.",
"80% of all statistics are made up.",
"If a tree falls in the woods, and no one is around to hear it, does it make a noise?"]
def __init__(self, interval: float = 10, *kargs, **kwargs):
super().__init__(*kargs, **kwargs)
self.interval = interval
async def do_websocket(self, ws: web.WebSocketResponse):
async def _regular_interval():
while self.srv.sockets is not None:
quote = random.choice(RandomQuoteServer.QUOTES)
ws.send_json({"quote": quote})
await asyncio.sleep(self.interval)
self.loop.create_task(_regular_interval())
await super().do_websocket(ws) # leave client connected here indefinitely
class TimeOfDayServer(MyServer):
""" Sends a message to all clients simultaneously about time of day. """
async def start(self):
await super().start()
async def _regular_interval():
while self.srv.sockets is not None:
if int(time.time()) % 10 == 0: # Only on the 10 second mark
timestamp = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now())
self.broadcast_message({"timestamp": timestamp})
await asyncio.sleep(1)
self.loop.create_task(_regular_interval())
if __name__ == "__main__":
main()发布于 2016-03-05 22:47:03
因此,我只熟悉Node中的Socket.IO,但是使用Socket.IO水平扩展websockets是相当容易的。
套接字可以随会话一起提供,因此每个会话都由特定的服务器管理。这样就可以轻松地保存每个打开的套接字的状态,并在所有服务器之间实现负载平衡。
下面是Python的SocketIO:
https://pypi.python.org/pypi/socketIO-client
以下是关于如何将会话附加到redis商店以使其更快和跨服务器的负载平衡更易于管理的一个非常好的阅读。
如何与Socket.IO 1.x和Express 4.x共享会话?
我知道这不能回答你关于aiohttp的问题,但希望这能让您更好地了解套接字是如何工作的。
编辑:用Node编写-
在Socket.IO中,这真的很容易,它有大量的函数以各种不同的方式广播消息。
对于您的例子,如果您想向每个聊天室的每个人发送一条消息。例如,每个打开套接字的人都可以轻松地编写。
socket.broadcast.emit('WARNING', "this is a test");假设您有开放的房间,您只能通过一个名为.to()的简单函数向房间中的人广播消息。例如,我有一个名为“烧烤”的房间:
socket.broadcast.to('BBQ').emit('invitation', 'Come get some food!');这将给大家在烧烤频道的信息--来拿点吃的!
编辑:编辑:
对于Socket.IO的工作方式来说,这是一个非常棒的写作,确保您阅读了更新后的函数版本的第二个答案。它比它们的文档容易得多。
据我所知,这也是python实现中的全部工作方式。为了方便使用,我肯定会将它用于websockets。aiohttp看起来非常强大,但要么没有这种功能,要么隐藏在文档中,要么只在代码中编写,还没有任何文档。
https://stackoverflow.com/questions/35820782
复制相似问题