首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何从faust应用程序向Websocket发送数据

如何从faust应用程序向Websocket发送数据
EN

Stack Overflow用户
提问于 2019-03-01 04:50:47
回答 1查看 1.3K关注 0票数 2

我现在正在做一个用例,使用Kafka和robinhood的faust来处理来自Kafka的数据。我已经成功地完成了计算,我需要的结果正在打印到我的faust worker运行的控制台上。

现在,我想要找到一种方法,使我的结果不仅在控制台中,而且在HTML页面中可见。我看过websockets库,但我不能让它与faust一起工作。我得到的错误是Crashed reason=RuntimeError('This event loop is already running'),我认为这是因为代码是为正在处理的每条消息执行的。

任何帮助都是非常感谢的

这是我使用的代码:

代码语言:javascript
复制
    import faust, datetime, websockets, asyncio

app = faust.App(
    'UseCase',
    broker='kafka://localhost:29092',
)

usecase_topic = app.topic('usecase',partitions=8)

usecase_table = app.Table('usecase', default=int)

checkfailure = {}

@app.agent(usecase_topic)
async def process_record(records):
    async for record in records:
        #count records for each Sensor
        print(record)
        sensor = record['ext_id']
        usecase_table[sensor] += 1
        #print(f'Records for Sensor {sensor}: {usecase_table[sensor]}')

        #write current timestamp of record and previous timestamp for each sensor to usecase_table dict
        currtime_id = record['ext_id']+'c'
        prevtime_id = record['ext_id']+'p'
        usecase_table[currtime_id] = datetime.datetime.strptime(record['tag_tsp'], "%Y%m%d%H%M%S.%f")

        #print current time
        print(f'Current time for Sensor {sensor}: {usecase_table[currtime_id]}')


        #calculate and print timestamp delta; if no previous value is given print message
        if usecase_table[prevtime_id] == 0:
            print(f'no previous timestamp for sensor {sensor}')
        else:
            usecase_table[prevtime_id] = datetime.datetime.strptime(usecase_table[prevtime_id], "%Y%m%d%H%M%S.%f")
            print(f'previous time for Sensor {sensor}: {usecase_table[prevtime_id]}')
            tsdelta = usecase_table[currtime_id] - usecase_table[prevtime_id]
            tsdelta_id = record['ext_id']+'t'
            usecase_table[tsdelta_id] = str(tsdelta)
            print(f'Sensor: {sensor} timestamp delta: {usecase_table[tsdelta_id]}')

        #calculate value delta
        currvalue_id = record['ext_id']+'cv'
        prevvalue_id = record['ext_id']+'pv'
        usecase_table[currvalue_id] = record['tag_value_int']

        print(f'current value for Sensor {sensor}: {usecase_table[currvalue_id]}')

        if usecase_table[prevvalue_id] == 0:
            print(f'no previous record for sensor {sensor}')
        else:
            print(f'previous value for Sensor {sensor}: {usecase_table[prevvalue_id]}')
            vdelta = usecase_table[currvalue_id] - usecase_table[prevvalue_id]
            vdelta_id = record['ext_id']+'v'
            usecase_table[vdelta_id] = vdelta
            print(f'Sensor: {sensor} value delta:{usecase_table[vdelta_id]}')

        #calculate cycle time
        if usecase_table[prevtime_id] != 0 and usecase_table[prevvalue_id] != 0 and usecase_table[vdelta_id] != 0:
            cycletime = tsdelta / usecase_table[vdelta_id]
            cyclemsg = f'Sensor {sensor}; Cycletime {cycletime}'
            print(cyclemsg)

        #add timestamp to checkfailure dict
        checkfailure[sensor] = datetime.datetime.strptime(record['tag_tsp'], "%Y%m%d%H%M%S.%f")
        #check if newest timestamp for a sensor is older than 10 secs
        for key in checkfailure:
            if datetime.datetime.now() - checkfailure[key] >= datetime.timedelta(seconds=10):
                failuremsg = f'Error: Sensor {key}'
                print(failuremsg)

        #send results to websocket
        async def send_result(websocket,path):
            results = cyclemsg + failuremsg
            await websockets.send(results)
        start_server = websockets.serve(send_result, '127.0.0.1', 5678)
        asyncio.get_event_loop().run_until_complete(start_server)

        #set previous value and timestamp to current
        usecase_table[prevtime_id] = record['tag_tsp']
        usecase_table[prevvalue_id] = record['tag_value_int']
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-04-02 04:31:18

被以下asyncio错误消息混淆是很正常的:)

不能从async def函数调用loop.run_until_complete

您需要做的是在后台启动websocket服务器。这应该很简单,而且它使用的是asyncio.ensure_future,但是您还希望您的websocket服务器在应用程序退出时正常关闭。

因此Faust使用"services",你可以为你的websocket服务器定义一个服务:

代码语言:javascript
复制
import faust
import websockets
from mode import Service
from websockets.exceptions import ConnectionClosed
from websockets.server import WebSocketServerProtocol


class App(faust.App):

   def on_init(self):
       self.websockets = Websockets(self)

   async def on_start(self):
       await self.add_runtime_dependency(self.websockets)



class Websockets(Service):

    def __init__(self, app, bind: str = 'localhost', port: int = 9999, **kwargs):
        self.app = app
        self.bind = bind
        self.port = port
        super().__init__(**kwargs)

    async def on_message(self, ws, message):
        ...

    async def on_messages(self,
                          ws: WebSocketServerProtocol,
                          path: str) -> None:
        try:
            async for message in ws:
                await self.on_message(ws, message)
        except ConnectionClosed:
            await self.on_close(ws)
        except asyncio.CancelledError:
            pass

    async def on_close(self, ws):
        # called when websocket socket is closed.
        ...

    @Service.task
    def _background_server(self):
         await websockets.serve(self.on_messages, self.bind, self.port)

app = App('UseCase')
# [...]
票数 9
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54934058

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档