我试着在豆荚上消费卡夫卡信息,但我在证书上遇到了一些问题.我尝试了这个在pod上运行java的bash脚本:
$ wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
$ tar -xzf kafka_2.13-3.2.0.tgz
$ cd kafka_2.13-3.2.0/bin
$ echo "security.protocol=SSL" > my.properties
$ ./kafka-topics.sh --describe --topic pingone_fraud --bootstrap-server <host>:9093 --command-config my.properties
# output:
Topic: <topic> PartitionCount: 8 ReplicationFactor: 3 Configs: cleanup.policy=delete,segment.bytes=262144000,retention.bytes=53687063712
Topic: <topic> Partition: 0 Leader: 204 Replicas: 204,203,205 Isr: 204,203,205
.
.
.正如您可以从输出中看到的那样,它没有出现问题!但是,当我试图使用python代码使用我的消息时:
consumer = KafkaConsumer(<topic>,
security_protocol='SSL',
ssl_cafile='/keys/CARoot.pem',
request_timeout_ms=_request_timeout_ms,
connections_max_idle_ms=_connections_max_idle_ms,
bootstrap_servers='<host>:9093',
group_id='fraud',
auto_offset_reset=_auto_offset_reset,
max_poll_interval_ms=_max_poll_interval_ms,
session_timeout_ms=_session_timeout_ms,
max_poll_records=_max_poll_records)
print(f'************* {consumer.topics()}')我明白这一例外:
Traceback (most recent call last):
File "/tmp/kafka_consumer.py", line 30, in <module>
max_poll_records=_max_poll_records)
File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 354, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 240, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 908, in check_version
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 1171, in check_version
if not self.connect_blocking(timeout_at - time.time()):
File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 333, in connect_blocking
self.connect()
File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 422, in connect
if self._try_handshake():
File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 501, in _try_handshake
self._sock.do_handshake()
File "/usr/local/lib/python3.6/ssl.py", line 1077, in do_handshake
self._sslobj.do_handshake()
File "/usr/local/lib/python3.6/ssl.py", line 689, in do_handshake
self._sslobj.do_handshake()
ssl.SSLError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:852)现在,关于我使用的证书,在keytool中,我有一个从保险库提取的证书,我使用keytool导出它,这样我就可以在python代码中使用它了:
keytool -exportcert -keystore /opt/java/lib/security/cacerts -alias vault-us -rfc -file /keys/CARoot.pem我很难理解为什么我的python代码不能工作,因为我使用相同的证书时出现了证书错误.
有人知道吗?
发布于 2022-06-09 09:00:03
看起来卡夫卡在幕后做了一些奇怪的ssl上下文内容。我通过创建自己的ssl上下文并将其提供给我的kafka使用者来解决这个问题:
def create_ssl_context():
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_verify_locations('/keys/vault_us.pem')
return ssl_context
consumer = KafkaConsumer(_topic_name,
ssl_context=create_ssl_context(),
security_protocol='SSL',
request_timeout_ms=_request_timeout_ms,
connections_max_idle_ms=_connections_max_idle_ms,
bootstrap_servers='<host>:9093',
group_id='fraud',
auto_offset_reset=_auto_offset_reset,
max_poll_interval_ms=_max_poll_interval_ms,
session_timeout_ms=_session_timeout_ms,
max_poll_records=_max_poll_records)https://stackoverflow.com/questions/72556396
复制相似问题