首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink -将pojo串行化到Kafka接收器

Flink -将pojo串行化到Kafka接收器
EN

Stack Overflow用户
提问于 2021-01-10 13:02:11
回答 1查看 2.2K关注 0票数 0

我的Flink代码的结构是:使用kafka获取数据(topic_1_in)、->反序列化消息、->地图、->、操作数据->、获取POJO ->序列化消息->、用kafka发送数据(topic_1_out)

我现在在最后一个阶段,我想序列化我的POJO。我在Flink网站上找到了以下例子:

代码语言:javascript
复制
DataStream<String> stream = ...

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

    FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
            "my-topic",                  // target topic
            new SimpleStringSchema(),    // serialization schema
            properties,                  // producer config
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
    
    stream.addSink(myProducer);

但我不明白如何实现序列化模式。

我还读到了不同的可能性:

https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

但是,我仍然有点困惑如何将我的POJO转换成一个字符串来填充Kafka接收器。这个类非常简单,所以我认为这是非常简单的。

代码语言:javascript
复制
public class POJO_block {
            public Double id;
            public Double tr_p;
            public Integer size;
            public Double last_info;
            public Long millis_last;
            private ArrayList<Tuple3<Integer, Integer, Integer>> list_val;

}

任何一个例子都会很感激。

谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-10 18:49:52

问题中提到的链接指的是内部Flink序列化,当Flink需要将我们的一些数据从集群的一个部分传送到另一个部分时,可以使用它,尽管在写入Kafka时不相关。

当Flink与外部存储(如Kafka )交互时,它依赖于连接器,而序列化的方式取决于连接器的配置细节以及底层外部存储的特定机制(例如,kafka记录中的键和值等概念)。

在您描述的示例中,由于您的程序使用的是DataStream API,并且正在与这里通信,所以您使用的连接器是Kafka,它的文档位于这里

在您提供的代码中,FlinkKafkaProducer接收器的这个参数指定序列化是如何发生的:

代码语言:javascript
复制
// this is probably not what you want:
new SimpleStringSchema(),    // serialization schema

此配置无法工作,因为SimpleStringSchema期望字符串作为输入,因此POJO_block流将使其失败。

您可以传递包含一个主函数的org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema的任何实现,允许您同时定义卡夫卡键的字节值和对应于每个POJO_block块实例的值(即下面的T ):

代码语言:javascript
复制
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);

请注意,如果您使用Table来读取和编写Kafka而不是DataStream API,那么将使用这个连接器,它有一个方便的格式化配置,可以使用csv、json、avro、Debezium等现成格式.

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65653568

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档