我的Flink代码的结构是:使用kafka获取数据(topic_1_in)、->反序列化消息、->地图、->、操作数据->、获取POJO ->序列化消息->、用kafka发送数据(topic_1_out)
我现在在最后一个阶段,我想序列化我的POJO。我在Flink网站上找到了以下例子:
DataStream<String> stream = ...
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addSink(myProducer);但我不明白如何实现序列化模式。
我还读到了不同的可能性:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
但是,我仍然有点困惑如何将我的POJO转换成一个字符串来填充Kafka接收器。这个类非常简单,所以我认为这是非常简单的。
public class POJO_block {
public Double id;
public Double tr_p;
public Integer size;
public Double last_info;
public Long millis_last;
private ArrayList<Tuple3<Integer, Integer, Integer>> list_val;
}任何一个例子都会很感激。
谢谢
发布于 2021-01-10 18:49:52
问题中提到的链接指的是内部Flink序列化,当Flink需要将我们的一些数据从集群的一个部分传送到另一个部分时,可以使用它,尽管在写入Kafka时不相关。
当Flink与外部存储(如Kafka )交互时,它依赖于连接器,而序列化的方式取决于连接器的配置细节以及底层外部存储的特定机制(例如,kafka记录中的键和值等概念)。
在您描述的示例中,由于您的程序使用的是DataStream API,并且正在与这里通信,所以您使用的连接器是Kafka,它的文档位于这里。
在您提供的代码中,FlinkKafkaProducer接收器的这个参数指定序列化是如何发生的:
// this is probably not what you want:
new SimpleStringSchema(), // serialization schema此配置无法工作,因为SimpleStringSchema期望字符串作为输入,因此POJO_block流将使其失败。
您可以传递包含一个主函数的org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema的任何实现,允许您同时定义卡夫卡键的字节值和对应于每个POJO_block块实例的值(即下面的T ):
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);请注意,如果您使用Table来读取和编写Kafka而不是DataStream API,那么将使用这个连接器,它有一个方便的格式化配置,可以使用csv、json、avro、Debezium等现成格式.
https://stackoverflow.com/questions/65653568
复制相似问题