首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink Kafka Table API for python with JAAS

Flink Kafka Table API for python with JAAS
EN

Stack Overflow用户
提问于 2020-11-09 11:32:03
回答 2查看 252关注 0票数 0

我正在使用Flink 1.11.2版本的Python Table API通过SASL协议连接到Kafka主题,但它失败了,错误如下。我在Flink java版本中尝试了相同的属性,并且我能够连接。有没有人遇到过这个问题,你是如何解决的?

代码语言:javascript
复制
Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule

设置:

代码语言:javascript
复制
kafka = Kafka()\
        .version("universal") \
        .topic("test_topic")\
        .property("group.id", "consumer_group")\
        .property("security.protocol", "SASL_PLAINTEXT")\
        .property("sasl.mechanism", "PLAIN")\
        .property("bootstrap.servers",
                  "<remoteIP>:9093")\
        .property("sasl.jaas.config",
                  "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" "
                  "password=\"abc\";")\
        .start_from_latest()
EN

回答 2

Stack Overflow用户

发布于 2021-09-28 11:03:43

我遇到了这个错误,不是在Python API中,而是在JVM中(我确实认为这是同一个问题),通过显式地向kafka-clients (撰写本文时为2.8.0)添加依赖项解决了这个问题。

票数 0
EN

Stack Overflow用户

发布于 2021-11-16 18:12:20

我知道这个问题已经非常老了,但是在决定如何在没有org.apache.kafka.common.security.plain.PlainLoginModule.的情况下为SASL进行配置时也有一个问题。

最终通过以下配置使其工作,并将其张贴在此处以供参考(通过SQL连接器指令使用python中的Table API ):

代码语言:javascript
复制
  KAFKA_SERVERS = os.getenv('KAFKA_BS_SERVERS',"kafka-0-external:9094,kafka-1- external:9094,kafka-2-external:9094,kafka-3-external:9094,kafka-4- external:9094").split(',')
  KAFKA_USERNAME = "XXX"
  KAFKA_PASSWORD = "pass"
  KAFKA_SOURCE_TOPIC = 'topic'

  source_ddl = f"""
            CREATE TABLE source_table(
                entry STRING
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SOURCE_TOPIC}',
              'properties.bootstrap.servers' = '{','.join(KAFKA_SERVERS)}',
              'properties.group.id' = 'testgroup12',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'raw'
            )
            """

  env = StreamExecutionEnvironment.get_execution_environment()
  env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
  settings = EnvironmentSettings.new_instance()\
                      .in_streaming_mode()\
                      .use_blink_planner()\
                      .build()

  t_env = StreamTableEnvironment.create(stream_execution_environment= env, environment_settings=settings)
  t_env.execute_sql(source_ddl).wait()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64745533

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档