我正在使用Flink 1.11.2版本的Python Table API通过SASL协议连接到Kafka主题,但它失败了,错误如下。我在Flink java版本中尝试了相同的属性,并且我能够连接。有没有人遇到过这个问题,你是如何解决的?
Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule设置:
kafka = Kafka()\
.version("universal") \
.topic("test_topic")\
.property("group.id", "consumer_group")\
.property("security.protocol", "SASL_PLAINTEXT")\
.property("sasl.mechanism", "PLAIN")\
.property("bootstrap.servers",
"<remoteIP>:9093")\
.property("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" "
"password=\"abc\";")\
.start_from_latest()发布于 2021-09-28 11:03:43
我遇到了这个错误,不是在Python API中,而是在JVM中(我确实认为这是同一个问题),通过显式地向kafka-clients (撰写本文时为2.8.0)添加依赖项解决了这个问题。
发布于 2021-11-16 18:12:20
我知道这个问题已经非常老了,但是在决定如何在没有org.apache.kafka.common.security.plain.PlainLoginModule.的情况下为SASL进行配置时也有一个问题。
最终通过以下配置使其工作,并将其张贴在此处以供参考(通过SQL连接器指令使用python中的Table API ):
KAFKA_SERVERS = os.getenv('KAFKA_BS_SERVERS',"kafka-0-external:9094,kafka-1- external:9094,kafka-2-external:9094,kafka-3-external:9094,kafka-4- external:9094").split(',')
KAFKA_USERNAME = "XXX"
KAFKA_PASSWORD = "pass"
KAFKA_SOURCE_TOPIC = 'topic'
source_ddl = f"""
CREATE TABLE source_table(
entry STRING
) WITH (
'connector' = 'kafka',
'topic' = '{KAFKA_SOURCE_TOPIC}',
'properties.bootstrap.servers' = '{','.join(KAFKA_SERVERS)}',
'properties.group.id' = 'testgroup12',
'properties.sasl.mechanism' = 'PLAIN',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
'scan.startup.mode' = 'latest-offset',
'format' = 'raw'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.use_blink_planner()\
.build()
t_env = StreamTableEnvironment.create(stream_execution_environment= env, environment_settings=settings)
t_env.execute_sql(source_ddl).wait()https://stackoverflow.com/questions/64745533
复制相似问题