首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用AWS kinesis的Apache flink消耗数据-获取java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer

使用AWS kinesis的Apache flink消耗数据-获取java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer
EN

Stack Overflow用户
提问于 2021-09-02 13:48:57
回答 1查看 298关注 0票数 0

这是我连接aws kinesis的代码。当我尝试连接FlinkKinesisConsumer时,它抛出了一个错误,比如Classnotfound异常。

代码语言:javascript
复制
import configs.AWSConfigConstants;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

public class StreamingJob {
    public static void main(String[] args) {
        try {
            final StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            String region = "us-east-1";
            String inputStreamName = "";
            String accesskey = "";
            String secretkey = "";
            String initPosition = "LATEST";
            String arn = "";
            
            Properties consumerConfig = new Properties();           
            consumerConfig.put(AWSConfigConstants.AWS_REGION, region);
            consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, accesskey);
            consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretkey);
            consumerConfig.put(AWSConfigConstants.AWS_ROLE_ARN, arn );          
            consumerConfig.put(AWSConfigConstants.STREAM_INITIAL_POSITION, initPosition);

            System.out.println("Consume config properties:");
            System.out.println(consumerConfig);
            
            DataStream<String> kinesisInputStream = sEnv.addSource(new FlinkKinesisConsumer<>
            (inputStreamName,new SimpleStringSchema(),consumerConfig));
            
            System.out.println(kinesisInputStream);
            sEnv.execute("Flink Streaming Processor");          
        } catch(Exception e) {
            System.out.println(e);
        }
    }

我得到了下面的错误

Output Screenshot

提前感谢您的帮助。

EN

回答 1

Stack Overflow用户

发布于 2021-09-02 19:09:30

您需要在构建中包含适当版本的连接器。类似这样,其中2.11是scala版本,1.13.2是Flink版本。

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis_2.11</artifactId>
    <version>1.13.2</version>
</dependency>

有关更多信息,请参阅the docs

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69031596

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档