首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink SQL客户端连接到安全的kafka集群

Flink SQL客户端连接到安全的kafka集群
EN

Stack Overflow用户
提问于 2021-02-23 06:37:42
回答 1查看 610关注 0票数 1

我想对Flink SQL表执行一个查询,该查询由安全的kafka集群的kafka主题支持。我能够以编程方式执行查询,但无法通过Flink SQL客户端执行。我不确定如何通过Flink传递JAAS (java.security.auth.login.config)和其他系统属性。

Flink SQL查询以编程方式

代码语言:javascript
复制
 private static void simpleExec_auth() {

        // Create the execution environment.
        final EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .withBuiltInCatalogName(
                        "default_catalog")
                .withBuiltInDatabaseName(
                        "default_database")
                .build();

        System.setProperty("java.security.auth.login.config","client_jaas.conf");
        System.setProperty("sun.security.jgss.native", "true");
        System.setProperty("sun.security.jgss.lib", "/usr/libexec/libgsswrap.so");
        System.setProperty("javax.security.auth.useSubjectCredsOnly","false");

        TableEnvironment tableEnvironment = TableEnvironment.create(settings);
        String createQuery = "CREATE TABLE  test_flink11 ( " + "`keyid` STRING, " + "`id` STRING, "
                + "`name` STRING, " + "`age` INT, " + "`color` STRING, " + "`rowtime` TIMESTAMP(3) METADATA FROM 'timestamp', " + "`proctime` AS PROCTIME(), " + "`address` STRING) " + "WITH ( "
                + "'connector' = 'kafka', "
                + "'topic' = 'test_flink10', "
                + "'scan.startup.mode' = 'latest-offset', "
                + "'properties.bootstrap.servers' = 'kafka01.nyc.com:9092', "
                + "'value.format' = 'avro-confluent', "
                + "'key.format' = 'avro-confluent', "
                + "'key.fields' = 'keyid', "
                + "'value.fields-include' = 'EXCEPT_KEY', "
                + "'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.sasl.kerberos.kinit.cmd' = '/usr/local/bin/skinit --quiet', 'properties.sasl.mechanism' = 'GSSAPI', "
                + "'key.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', "
                + "'key.avro-confluent.schema-registry.subject' = 'test_flink6', "
                + "'value.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', "
                + "'value.avro-confluent.schema-registry.subject' = 'test_flink4')";
        System.out.println(createQuery);
        tableEnvironment.executeSql(createQuery);
        TableResult result = tableEnvironment
                .executeSql("SELECT name,rowtime FROM test_flink11");
        result.print();
    }

这很好用。

通过SQL client进行的SQL Flink查询

运行此操作将产生以下错误。

代码语言:javascript
复制
Flink SQL> CREATE TABLE test_flink11 (`keyid` STRING,`id` STRING,`name` STRING,`address` STRING,`age` INT,`color` STRING) WITH('connector' = 'kafka', 'topic' = 'test_flink10','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'kafka01.nyc.com:9092','value.format' = 'avro-confluent','key.format' = 'avro-confluent','key.fields' = 'keyid', 'value.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', 'value.avro-confluent.schema-registry.subject' = 'test_flink4', 'value.fields-include' = 'EXCEPT_KEY', 'key.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', 'key.avro-confluent.schema-registry.subject' = 'test_flink6', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.sasl.kerberos.kinit.cmd' = '/usr/local/bin/skinit --quiet', 'properties.sasl.mechanism' = 'GSSAPI');

Flink SQL> select * from test_flink11;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /tmp/jaas-6309821891889949793.conf

除了下面的注释外,/tmp/jaas-6309821891889949793.conf中没有任何内容

代码语言:javascript
复制
# We are using this file as an workaround for the Kafka and ZK SASL implementation
# since they explicitly look for java.security.auth.login.config property
# Please do not edit/delete this file - See FLINK-3929

SQL客户端运行命令

代码语言:javascript
复制
bin/sql-client.sh embedded --jar  flink-sql-connector-kafka_2.11-1.12.0.jar  --jar flink-sql-avro-confluent-registry-1.12.0.jar

Flink集群命令

代码语言:javascript
复制
bin/start-cluster.sh

如何为snippet传递这个java.security.auth.login.config和其他系统属性(我在上面的java代码片段中设置了这些属性)?

EN

回答 1

Stack Overflow用户

发布于 2021-02-23 11:29:57

flink-conf.yaml

代码语言:javascript
复制
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.principal: XXXXX@HADOOP.COM
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /path/to/kafka.keytab
security.kerberos.login.principal: XXXX@HADOOP.COM
security.kerberos.login.contexts: Client,KafkaClient

我还没有真正测试这个解决方案是否可行,你可以试一试,希望它能帮助你。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66328240

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档