首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink Kafka连接器运行时错误

Flink Kafka连接器运行时错误
EN

Stack Overflow用户
提问于 2016-10-12 22:22:23
回答 1查看 1.7K关注 0票数 0

我正在使用:

  • flink 1.1.2
  • Kafka 2.10-0.10.0.1
  • flink-connector-kafka-0.9.2.10-1.0.0

我正在使用以下非常简单/基本的应用程序

代码语言:javascript
复制
Properties properties = new Properties();                               
properties.setProperty("bootstrap.servers", "localhost:33334");         

properties.setProperty("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
properties.setProperty("group.id", "test");                             
String topic = "mytopic";                                                

FlinkKafkaConsumer09<String> fkc =                                      
    new FlinkKafkaConsumer09<String>(topic, new SimpleStringSchema(), properties);

DataStream<String> stream = env.addSource(fkc);    
env.execute()

在使用maven编译它之后,当我尝试使用以下命令运行时:

代码语言:javascript
复制
bin/flink run -c  com.mycompany.app.App fkaf/target/fkaf-1.0-SNAPSHOT.jar

我看到以下运行时错误:

代码语言:javascript
复制
Submitting job with JobID: f6e290ec7c28f66d527eaa5286c00f4d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-1679485245]
10/12/2016 15:10:06     Job execution switched to status RUNNING.
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to SCHEDULED 
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to DEPLOYING 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to SCHEDULED 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to DEPLOYING 
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to RUNNING 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to RUNNING 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to CANCELED 
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to FAILED 
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.open(FlinkKafkaConsumer09.java:282)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:722)

知道为什么找不到方法assign()吗?该方法位于lib/kafka-客户机-0.10.0.1.jar中。

代码语言:javascript
复制
    ParameterTool parameterTool = ParameterTool.fromArgs(args);             

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    // print() will write the contents of the stream to the TaskManager's standard out stream
    // the rebelance call is causing a repartitioning of the data so that all machines
    // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
    messageStream.rebalance().map(new MapFunction<String, String>() {       
        private static final long serialVersionUID = -6867736771747690202L; 

        @Override                                                           
        public String map(String value) throws Exception {                  
            return "Kafka and Flink says: " + value;                        
        }                                                                   
    }).print();                                                             

    env.execute();  
EN

回答 1

Stack Overflow用户

发布于 2016-10-12 22:52:45

NoSuchMethodError表示版本不匹配。

我想问题是,你试图连接一个卡夫卡0.9消费者和一个卡夫卡0.10的例子。Flink 1.1.x不提供卡夫卡0.10消费者。但是,在即将发布的1.2.0版本中将包含一个0.10的消费者。

您可以尝试从当前的主分支(1.2-快照)构建Kafka 0.10使用者,并使用Flink 1.1.2。相应的Flink API应该是稳定的,并且向后兼容,从1.2到1.1。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40009344

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档