在我的flink脚本,我有一个流,我从一个卡夫卡主题,操纵它,并将它发送回卡夫卡使用水槽。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties p = new Properties();
p.setProperty("bootstrap.servers", servers_ip_list);
p.setProperty("gropu.id", "Flink");
FlinkKafkaConsumer<Event_N> kafkaData_N =
new FlinkKafkaConsumer("CorID_0", new Ev_Des_Sch_N(), p);
WatermarkStrategy<Event_N> wmStrategy =
WatermarkStrategy
.<Event_N>forMonotonousTimestamps()
.withIdleness(Duration.ofMinutes(1))
.withTimestampAssigner((Event, timestamp) -> {
return Event.get_Time();
});
DataStream<Event_N> stream_N = env.addSource(
kafkaData_N.assignTimestampsAndWatermarks(wmStrategy));上面的部分工作很好,一点问题都没有,下面的部分是我得到问题的地方。
String ProducerTopic = "CorID_0_f1";
DataStream<Stream_Blocker_Pojo.block> box_stream_p= stream_N
.keyBy((Event_N CorrID) -> CorrID.get_CorrID())
.map(new Stream_Blocker_Pojo());
FlinkKafkaProducer<Stream_Blocker_Pojo.block> myProducer = new FlinkKafkaProducer<>(
ProducerTopic,
new ObjSerializationSchema(ProducerTopic),
p,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
box_stream_p.addSink(myProducer);没有错误,一切正常,这是一个Stream_Blocker_Pojo,我在这里映射一个流,操作它并发送一个新的(我简化了我的代码,只保留了4个变量,删除了所有的数学和数据处理)。
public class Stream_Blocker_Pojo extends RichMapFunction<Event_N, Stream_Blocker_Pojo.block>
{
public class block {
public Double block_id;
public Double block_var2 ;
public Double block_var3;
public Double block_var4;}
private transient ValueState<block> state_a;
@Override
public void open(Configuration parameters) throws Exception {
state_a = getRuntimeContext().getState(new ValueStateDescriptor<>("BoxState_a", block.class));
}
public block map(Event_N input) throws Exception {
p1.Stream_Blocker_Pojo.block current_a = state_a.value();
if (current_a == null) {
current_a = new p1.Stream_Blocker_Pojo.block();
current_a.block_id = 0.0;
current_a.block_var2 = 0.0;
current_a.block_var3 = 0.0;
current_a.block_var4 = 0.0;}
current_a.block_id = input.f_num_id;
current_a.block_var2 = input.f_num_2;
current_a.block_var3 = input.f_num_3;
current_a.tblock_var4 = input.f_num_4;
state_a.update(current_a);
return new block();
};
}这是Kafka序列化模式的实现。
public class ObjSerializationSchema implements KafkaSerializationSchema<Stream_Blocker_Pojo.block>{
private String topic;
private ObjectMapper mapper;
public ObjSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(Stream_Blocker_Pojo.block obj, Long timestamp) {
byte[] b = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
b= mapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
}
return new ProducerRecord<byte[], byte[]>(topic, b);
}
}当我使用kafka打开从Flink脚本发送的消息时,我发现所有变量都是"null“
CorrID b‘{“block_id”:空,"block_var1":null,"block_var2":null,"block_var3":null,"block_var4":null}
看来我在发送一个没有值的空obj。但我很难理解我做错了什么。我认为问题可能是Stream_Blocker_Pojo的实现,也可能是ObjSerializationSchema,,任何帮助都是非常感谢的。谢谢
发布于 2021-01-13 23:31:14
这里有两个可能的问题:
block类型的变量没有空字段吗?您可能希望调试该部分以确定。ObjectMapper中,您应该为您的block提供getter和setter,否则杰克逊可能无法访问它们。https://stackoverflow.com/questions/65711036
复制相似问题