首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >FlinkKafkaProducer的重载构造函数

FlinkKafkaProducer的重载构造函数
EN

Stack Overflow用户
提问于 2021-05-24 01:43:02
回答 1查看 144关注 0票数 1

我正在尝试使用Scala和Flink将消息发布到Kafka主题中。但是,当使用the documentation中提供的代码创建FlinkKafkaProducer对象时,它告诉我无法应用构造器。以下是代码示例:

代码语言:javascript
复制
val studentProducer = new FlinkKafkaProducer[String](
 "my_topic",                  // target topic
 new SimpleStringSchema(),    // serialization schema
 properties,                  // producer config
 FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance

具有以下导入:

代码语言:javascript
复制
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

import java.util.Properties

这就是我得到的错误:

代码语言:javascript
复制
/home/user/Flink/flinkproj/src/main/scala/org/flink/Job.scala:83:27: overloaded method constructor FlinkKafkaProducer with alternatives:
[error]   (x$1: String,x$2: org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema[String],x$3: java.util.Properties,x$4: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic)org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer[String] <and>
[error]   (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedSerializationSchema[String],x$3: java.util.Properties,x$4: java.util.Optional[org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner[String]])org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer[String] <and>
[error]   (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedSerializationSchema[String],x$3: java.util.Properties,x$4: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic)org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer[String] <and>
[error]   (x$1: String,x$2: org.apache.flink.api.common.serialization.SerializationSchema[String],x$3: java.util.Properties,x$4: java.util.Optional[org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner[String]])org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer[String]
[error]  cannot be applied to (String, org.apache.flink.streaming.util.serialization.SimpleStringSchema, java.util.Properties, org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic)
[error]     val studentProducer = new FlinkKafkaProducer[String](

变量propertiesjava.util.Properties的一个实例。我认为它必须在字符串序列化程序中,但我看不出有什么问题。

以下是来自build.sbt的版本的详细信息

代码语言:javascript
复制
ThisBuild / scalaVersion := "2.11.8"

val flinkVersion = "1.11.3"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "provided")
EN

回答 1

Stack Overflow用户

发布于 2021-05-24 04:43:28

我认为文档已经过时了,您需要提供KafkaSerializationSchemaKeyedSerializationSchema --或者如果您使用SerializationSchema,则还需要提供FlinkKafkaPartitioner

我没有用Scala写的例子,但是这里有一个用Java写的例子,展示了如何实现一个使用ObjectMapper写出JSON的KafkaSerializationSchema

代码语言:javascript
复制
/**
 * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
 *
 */
public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> {

    private static final ObjectMapper objectMapper = new ObjectMapper();
    private String topic;

    public ClickEventStatisticsSerializationSchema(){
    }

    public ClickEventStatisticsSerializationSchema(String topic) {
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(
            final ClickEventStatistics message, @Nullable final Long timestamp) {
        try {
            //if topic is null, default topic will be used
            return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException("Could not serialize record: " + message, e);
        }
    }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67662718

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档