我正在尝试实现一个简单的闪光作业,使用org.apache.flink.streaming.connectors,将一个卡夫卡主题作为输入源和输出到一个卡夫卡接收器。我正在遵循这个指南https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/并编写代码
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new SimpleStringSchema(), props); //FlinkKafkaConsumer<String> testKafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_TEST, new SimpleStringSchema(), props);
kafkaConsumer.setStartFromEarliest();
DataStream<String> dataStream = env.addSource(kafkaConsumer);
StringSchema stringSchema = new StringSchema(TOPIC_OUT);
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(TOPIC_OUT, stringSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
//addSink((SinkFunction<String>) kafkaProducer);
dataStream.addSink(kafkaProducer);但是,addSink需要SinkFunction,而我提供了一个扩展TwoPhaseCommitSinkFunction的FlinkKafkaProducer。我搞不懂为什么它会抱怨而不能工作。

我的pom.xml文件如下
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.13.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>发布于 2021-08-28 21:56:36
发布于 2021-08-29 15:34:16
您正在使用的方法签名中没有FlinkKafkaProducer构造函数。你可以使用这个:
public FlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<IN> customPartitioner,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize)https://stackoverflow.com/questions/68968184
复制相似问题