首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >kafka-python引发UnrecognizedBrokerVersion错误

kafka-python引发UnrecognizedBrokerVersion错误
EN

Stack Overflow用户
提问于 2019-10-31 17:15:04
回答 2查看 12.2K关注 0票数 8

在使用kafka-python包构造KafkaProducer时,我得到了这个错误:

代码语言:javascript
复制
[ERROR] UnrecognizedBrokerVersion: UnrecognizedBrokerVersion
Traceback (most recent call last):
  File "/var/lang/lib/python3.7/imp.py", line 234, in load_module
    return load_source(name, filename, file)
  File "/var/lang/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/var/task/kafka/producer/kafka.py", line 381, in __init__
    **self.config)
  File "/var/task/kafka/client_async.py", line 240, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/var/task/kafka/client_async.py", line 908, in check_version
    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
  File "/var/task/kafka/conn.py", line 1228, in check_version
    raise Errors.UnrecognizedBrokerVersion()

代码如下:

代码语言:javascript
复制
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=os.environ.get('KAFKA_HOST', 'localhost:9092'))

我使用的是Python 3.7和AWS MSK集群。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-10-31 17:49:46

只需将security_protocol="SSL"添加到KafkaProducer即可解决此问题,如下所示:

代码语言:javascript
复制
from kafka import KafkaProducer
producer = KafkaProducer(security_protocol="SSL", bootstrap_servers=os.environ.get('KAFKA_HOST', 'localhost:9092'))
票数 16
EN

Stack Overflow用户

发布于 2021-02-09 07:45:05

kafka-python库方法有很多可选参数。以下是我能够监听Azure服务器的最新工作脚本:

代码语言:javascript
复制
import os, kafka  # pip install kafka-python

consumer = kafka.KafkaConsumer("test-topic",
    bootstrap_servers=["test-server.servicebus.windows.net:9093"],
    auto_offset_reset="earliest",
    enable_auto_commit=True,
    group_id="$Default",
    sasl_mechanism="PLAIN",
    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
    sasl_plain_username="$ConnectionString",
    security_protocol="SASL_SSL",
    value_deserializer=lambda x: x.decode("utf-8"))

print(datetime.datetime.now())
for message in consumer:
    message_dict = json.loads(message.value)
    print(datetime.datetime.now())
    print("%s" % (json.dumps(message_dict,indent=4)))

它是粗糙的,但很有效。比来自https://kafka.apache.org/quickstart页面(需要各种配置文件和环境变量)的Java包装在bash中的Kafka演示消费者更简单。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58640045

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档