首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Java - Flink在kafka接收器上发送空对象

Java - Flink在kafka接收器上发送空对象
EN

Stack Overflow用户
提问于 2021-01-13 23:02:12
回答 1查看 427关注 0票数 0

在我的flink脚本,我有一个流,我从一个卡夫卡主题,操纵它,并将它发送回卡夫卡使用水槽。

代码语言:javascript
复制
    public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    Properties p = new Properties();
    p.setProperty("bootstrap.servers", servers_ip_list);
    p.setProperty("gropu.id", "Flink");

   
    FlinkKafkaConsumer<Event_N> kafkaData_N =
            new FlinkKafkaConsumer("CorID_0", new Ev_Des_Sch_N(), p);
    WatermarkStrategy<Event_N> wmStrategy =
            WatermarkStrategy
                    .<Event_N>forMonotonousTimestamps()
                    .withIdleness(Duration.ofMinutes(1))
                    .withTimestampAssigner((Event, timestamp) -> {
                        return Event.get_Time();
                    });
    DataStream<Event_N> stream_N = env.addSource(
            kafkaData_N.assignTimestampsAndWatermarks(wmStrategy));

上面的部分工作很好,一点问题都没有,下面的部分是我得到问题的地方。

代码语言:javascript
复制
    String ProducerTopic = "CorID_0_f1";

    DataStream<Stream_Blocker_Pojo.block> box_stream_p= stream_N
                .keyBy((Event_N CorrID) -> CorrID.get_CorrID())
                .map(new Stream_Blocker_Pojo());

    FlinkKafkaProducer<Stream_Blocker_Pojo.block> myProducer = new FlinkKafkaProducer<>(
                ProducerTopic,
                new ObjSerializationSchema(ProducerTopic),
                p,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance

     box_stream_p.addSink(myProducer);

没有错误,一切正常,这是一个Stream_Blocker_Pojo,我在这里映射一个流,操作它并发送一个新的(我简化了我的代码,只保留了4个变量,删除了所有的数学和数据处理)。

代码语言:javascript
复制
public class Stream_Blocker_Pojo extends RichMapFunction<Event_N, Stream_Blocker_Pojo.block>
{

        public class block {
        public Double block_id;
        public Double block_var2 ;
        public Double block_var3;
        public Double block_var4;}
        
        private transient ValueState<block> state_a;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            state_a = getRuntimeContext().getState(new ValueStateDescriptor<>("BoxState_a", block.class));
        }

        public block map(Event_N input) throws Exception {

        p1.Stream_Blocker_Pojo.block current_a = state_a.value();

            if (current_a == null) {
                current_a = new p1.Stream_Blocker_Pojo.block();
                current_a.block_id = 0.0;
                current_a.block_var2 = 0.0;
                current_a.block_var3 = 0.0;
                current_a.block_var4 = 0.0;}

        
            current_a.block_id = input.f_num_id;
            current_a.block_var2 = input.f_num_2;
            current_a.block_var3 = input.f_num_3;
            current_a.tblock_var4 = input.f_num_4;
          
            state_a.update(current_a);
            return new block();
        };   
    }

这是Kafka序列化模式的实现。

代码语言:javascript
复制
public class ObjSerializationSchema implements KafkaSerializationSchema<Stream_Blocker_Pojo.block>{

    private String topic;
    private ObjectMapper mapper;

    public ObjSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Stream_Blocker_Pojo.block obj, Long timestamp) {
        byte[] b = null;
        if (mapper == null) {
            mapper = new ObjectMapper();
        }
        try {
            b= mapper.writeValueAsBytes(obj);
        } catch (JsonProcessingException e) {

        }
        return new ProducerRecord<byte[], byte[]>(topic, b);
    }

}

当我使用kafka打开从Flink脚本发送的消息时,我发现所有变量都是"null“

CorrID b‘{“block_id”:空,"block_var1":null,"block_var2":null,"block_var3":null,"block_var4":null}

看来我在发送一个没有值的空obj。但我很难理解我做错了什么。我认为问题可能是Stream_Blocker_Pojo的实现,也可能是ObjSerializationSchema,,任何帮助都是非常感谢的。谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-13 23:31:14

这里有两个可能的问题:

  1. 您确定您要传递的block类型的变量没有空字段吗?您可能希望调试该部分以确定。
  2. 的原因可能也在ObjectMapper中,您应该为您的block提供getter和setter,否则杰克逊可能无法访问它们。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65711036

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档