首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer是否实现了SinkFunction<T> sinkFunction

org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer是否实现了SinkFunction<T> sinkFunction
EN

Stack Overflow用户
提问于 2021-08-28 21:18:19
回答 2查看 209关注 0票数 0

我正在尝试实现一个简单的闪光作业,使用org.apache.flink.streaming.connectors,将一个卡夫卡主题作为输入源和输出到一个卡夫卡接收器。我正在遵循这个指南https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/并编写代码

代码语言:javascript
复制
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new SimpleStringSchema(), props);  //FlinkKafkaConsumer<String> testKafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_TEST, new SimpleStringSchema(), props);
        kafkaConsumer.setStartFromEarliest();
        DataStream<String> dataStream = env.addSource(kafkaConsumer);

        StringSchema stringSchema = new StringSchema(TOPIC_OUT);
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(TOPIC_OUT, stringSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        //addSink((SinkFunction<String>) kafkaProducer);

        dataStream.addSink(kafkaProducer);

但是,addSink需要SinkFunction,而我提供了一个扩展TwoPhaseCommitSinkFunctionFlinkKafkaProducer。我搞不懂为什么它会抱怨而不能工作。

我的pom.xml文件如下

代码语言:javascript
复制
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.13.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
EN

回答 2

Stack Overflow用户

发布于 2021-08-28 21:56:36

Stack Overflow用户

发布于 2021-08-29 15:34:16

您正在使用的方法签名中没有FlinkKafkaProducer构造函数。你可以使用这个:

代码语言:javascript
复制
public FlinkKafkaProducer(
            String topicId,
            SerializationSchema<IN> serializationSchema,
            Properties producerConfig,
            @Nullable FlinkKafkaPartitioner<IN> customPartitioner,
            FlinkKafkaProducer.Semantic semantic,
            int kafkaProducersPoolSize)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68968184

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档