首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink - kafka连接器OAUTHBEARER类加载程序问题

Flink - kafka连接器OAUTHBEARER类加载程序问题
EN

Stack Overflow用户
提问于 2021-09-06 12:13:48
回答 2查看 408关注 0票数 1

我尝试使用sasl机制(OAUTHBEARER)配置kafka身份验证(使用flink 1.9.2,kafka-client 2.2.0)。

当使用Flink与SASL身份验证时,我得到了下面的异常。卡夫卡是在一个厚厚的罐子与应用程序。

在远程调试之后,我发现我的回调处理程序有一个ChildFirstClassloader,并且ChildFirstClassloader属于另一个ChildFirstClassloader,因此下面的测试失败(OAuthBearerSaslClientFactory):

代码语言:javascript
复制
if (!(Objects.requireNonNull(callbackHandler) instanceof AuthenticateCallbackHandler))
    throw new IllegalArgumentException(String.format(
         "Callback handler must be castable to %s: %s",
         AuthenticateCallbackHandler.class.getName(), callbackHandler.getClass().getName()));

我不知道为什么这两个类有两个不同的类加载器。

有什么想法吗?有什么解决办法吗?

谢谢你的帮助。

代码语言:javascript
复制
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
    at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
    at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:168)
    at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254)
    at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202)
    at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:140)
    at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210)
    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334)
    at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:257)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
EN

回答 2

Stack Overflow用户

发布于 2022-01-28 21:02:05

我不知道你是否已经解决了这个问题,但是我和这个完全相同的场景斗争了很长一段时间。最后为我工作的是将kafka-客户端jar复制到Flink的lib/目录中。

票数 0
EN

Stack Overflow用户

发布于 2022-01-30 06:19:31

对不起,忘了发布解决方案,但是是的,我也以同样的方式解决了这个问题,我复制了flink中的kafka客户端。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69074363

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档