问题:--我有一个对象属性.status,它是用一个Kafka主题更新的,然后通过websocket发送它。我的问题是,每次我从客户端(javascript)请求,服务器(websockets +Python中的异步)将从一开始就启动Kafka消费者。
问题:是否有可能让卡夫卡循环(for msg in consumer:)更新我的custom_obj对象并只在被要求时发送它的.status值?
我试过什么
到目前为止,我在服务器端的情况如下:
import asyncio
import websockets
from kafka import KafkaConsumer
import Custom_obj
async def test(websocket):
consumer = KafkaConsumer(
'kafka-topic',
bootstrap_servers=['kafka.server.com:1234'],
auto_offset_reset='earliest', #Must start from the beginning to build the object correctly
enable_auto_commit=True,
)
custom_obj = Custom_obj()
for msg in consumer:
msg_dec = msg.value.decode()
custom_obj.update(msg_dec)
await websocket.send(custom_obj.status)
async def main():
async with websockets.serve(test, "localhost", 1234):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())客户端( Vue组件中的javascript)代码:
created() {
const ws = new WebSocket('ws://localhost:1234');
ws.onopen = function(e) {
ws.send('Got here!')
this.connectionStatus = 'Connected.'
}
ws.onerror = function(e) {
ws.close()
}
ws.onclose = function(e) {
this.connectionStatus = 'Disconnected.'
}
ws.onmessage = (e) => {
console.log(1)
}发布于 2022-03-12 02:16:37
如果要跟踪卡夫卡主题的进度,则需要在构造函数中使用group_id参数
您还可能需要查看aiokafka以获得额外的异步支持。
https://stackoverflow.com/questions/71443384
复制相似问题