我正在尝试通过Cloudera Data Science Workbench在我们内部的Hadoop集群上实现GitHub项目(https://github.com/tomatoTomahto/CDH-Sensor-Analytics)。
在Cloudera Data Science Workbench上运行该项目时,当尝试通过Python api KafkaProducer(bootstrap_servers='broker1:9092') Code can be found in [https://github.com/tomatoTomahto/CDH-Sensor-Analytics/blob/master/datagenerator/KafkaConnection.py]连接到Kafka时,我收到错误消息"No Brokers“。
我已经使用Kerberos进行了身份验证。我已经尝试给出没有端口号的代理节点,也可以作为一个列表。但是,到目前为止,什么都没有奏效。
下面是堆栈跟踪。
NoBrokersAvailable: NoBrokersAvailable
NoBrokersAvailable Traceback (most recent call
last)
in engine
----> 1 dgen = DataGenerator(config)
/home/cdsw/datagenerator/DataGenerator.py in __init__(self, config)
39
40 self._kudu = KuduConnection(self._config['kudu_master'],
self._config['kudu_port'], spark)
---> 41 self._kafka =
KafkaConnection(self._config['kafka_brokers'],
self._config['kafka_topic'])
42
43 #self._kafka
/home/cdsw/datagenerator/KafkaConnection.py in __init__(self, brokers,
topic)
4 class KafkaConnection():
5 def __init__(self, brokers, topic):
----> 6 self._kafka_producer =
KafkaProducer(bootstrap_servers=brokers)
7 self._topic = topic
8
/home/cdsw/.local/lib/python3.6/site-packages/kafka/producer/kafka.py
in __init__(self, **configs)
333
334 client = KafkaClient(metrics=self._metrics,
metric_group_prefix='producer',
--> 335 **self.config)
336
337 # Get auto-discovered version from client if necessary
/home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in
__init__(self, **configs)
208 if self.config['api_version'] is None:
209 check_timeout =
self.config['api_version_auto_timeout_ms'] / 1000
--> 210 self.config['api_version'] =
self.check_version(timeout=check_timeout)
211
212 def _bootstrap(self, hosts):
/home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in
check_version(self, node_id, timeout, strict)
806 try_node = node_id or self.least_loaded_node()
807 if try_node is None:
--> 808 raise Errors.NoBrokersAvailable()
809 self._maybe_connect(try_node)
810 conn = self._conns[try_node]
NoBrokersAvailable: NoBrokersAvailable我还尝试通过具有VPN连接的CLI在工作台之外进行连接。我得到了同样的错误。
对于我错过了什么有什么建议吗?提前感谢!
发布于 2017-10-14 02:38:24
第一步是确定网络路由是否打开,代理是否已启动并侦听该端口。之后,您可以检查身份验证等。
你有没有试过telnet <broker host> 9092
除了listeners之外,您可能还需要显式地设置advertised.listeners,我偶尔会看到Java有一个奇怪的问题,它没有绑定到预期的网络接口(或者至少是我期望的网络接口!)我不得不使用advertised.listeners来强制它。
https://stackoverflow.com/questions/45683021
复制相似问题