这是我连接aws kinesis的代码。当我尝试连接FlinkKinesisConsumer时,它抛出了一个错误,比如Classnotfound异常。
import configs.AWSConfigConstants;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
public class StreamingJob {
public static void main(String[] args) {
try {
final StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
String region = "us-east-1";
String inputStreamName = "";
String accesskey = "";
String secretkey = "";
String initPosition = "LATEST";
String arn = "";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, region);
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, accesskey);
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretkey);
consumerConfig.put(AWSConfigConstants.AWS_ROLE_ARN, arn );
consumerConfig.put(AWSConfigConstants.STREAM_INITIAL_POSITION, initPosition);
System.out.println("Consume config properties:");
System.out.println(consumerConfig);
DataStream<String> kinesisInputStream = sEnv.addSource(new FlinkKinesisConsumer<>
(inputStreamName,new SimpleStringSchema(),consumerConfig));
System.out.println(kinesisInputStream);
sEnv.execute("Flink Streaming Processor");
} catch(Exception e) {
System.out.println(e);
}
}我得到了下面的错误
提前感谢您的帮助。
发布于 2021-09-02 19:09:30
您需要在构建中包含适当版本的连接器。类似这样,其中2.11是scala版本,1.13.2是Flink版本。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis_2.11</artifactId>
<version>1.13.2</version>
</dependency>有关更多信息,请参阅the docs。
https://stackoverflow.com/questions/69031596
复制相似问题