首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我可以用websocket值发送一个用Kafka主题更新的变量吗?

我可以用websocket值发送一个用Kafka主题更新的变量吗?
EN

Stack Overflow用户
提问于 2022-03-11 18:52:03
回答 1查看 101关注 0票数 0

问题:--我有一个对象属性.status,它是用一个Kafka主题更新的,然后通过websocket发送它。我的问题是,每次我从客户端(javascript)请求,服务器(websockets +Python中的异步)将从一开始就启动Kafka消费者。

问题:是否有可能让卡夫卡循环(for msg in consumer:)更新我的custom_obj对象并只在被要求时发送它的.status值?

我试过什么

到目前为止,我在服务器端的情况如下:

代码语言:javascript
复制
import asyncio
import websockets
from kafka import KafkaConsumer
import Custom_obj

async def test(websocket):
    consumer = KafkaConsumer(
    'kafka-topic', 
    bootstrap_servers=['kafka.server.com:1234'],
    auto_offset_reset='earliest', #Must start from the beginning to build the object correctly
    enable_auto_commit=True,
    )
    custom_obj = Custom_obj()
    for msg in consumer:
        msg_dec = msg.value.decode()
        custom_obj.update(msg_dec)
        await websocket.send(custom_obj.status) 

async def main():
    async with websockets.serve(test, "localhost", 1234):
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    asyncio.run(main())

客户端( Vue组件中的javascript)代码:

代码语言:javascript
复制
created() {
  const ws = new WebSocket('ws://localhost:1234');
  ws.onopen = function(e) {
    ws.send('Got here!')
    this.connectionStatus = 'Connected.'
  }
  ws.onerror = function(e) {
    ws.close()
  }
  ws.onclose = function(e) {
    this.connectionStatus = 'Disconnected.'
  }
  ws.onmessage = (e) => {
    console.log(1)
  }
EN

回答 1

Stack Overflow用户

发布于 2022-03-12 02:16:37

如果要跟踪卡夫卡主题的进度,则需要在构造函数中使用group_id参数

您还可能需要查看aiokafka以获得额外的异步支持。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71443384

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档