首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Flink Kafka Producer中发送密钥

在Flink Kafka Producer中发送密钥
EN

Stack Overflow用户
提问于 2020-12-11 16:38:12
回答 2查看 778关注 0票数 0

我是Flink Stream processing的新手,需要一些Flink Kafka producer的帮助,因为经过一段时间的搜索后,找不到太多相关的东西。我目前正在从Kafka主题读取流,然后在执行一些计算后,我想将此写入Kafka中的一个新的独立主题。但我面临的问题是,我不能发送Kafka主题的密钥。我使用的是Flink Kafka连接器,它给了我FlinkKafkaConsumer和FlinkKafkaProducer。下面是我的代码,下面是我的代码,我可以在我的代码中更改什么,它可以工作,目前在Kafka上,我正在生成我的消息是使用null in Key,其中的值是我需要的:

代码语言:javascript
复制
Properties consumerProperties = new Properties();
    
    consumerProperties.setProperty("bootstrap.servers", serverURL);
    consumerProperties.setProperty("group.id", groupID);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(consumerTopicName,
            new SimpleStringSchema(), consumerProperties);

    kafkaConsumer.setStartFromEarliest();
    DataStream<String> kafkaConsumerStream = env.addSource(kafkaConsumer);
    final int[] tVoteCount = {0};
    
    DataStream<String> kafkaProducerStream = kafkaConsumerStream.map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws InterruptedException, IOException {
            JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
            Tcount = Tcount + jsonNode.get(key1).asInt();
            int nameCandidate = jsonNode.get(key2).asInt();
            System.out.println(Tcount);
            String tCountT = Integer.toString(Tcount);
            //tVoteCount = tVoteCount + voteCount;
             
            //waitForEventTime(timeStamp);
            return tCountT;
        }
    });
    kafkaConsumerStream.print();
    System.out.println("sdjknvksjdnv"+Tcount);
    Properties producerProperties = new Properties();
    producerProperties.setProperty("bootstrap.servers", serverURL);
    FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(producerTopicName,
            new SimpleStringSchema(), producerProperties);
    kafkaProducerStream.addSink(kafkaProducer);
    env.execute();

谢谢。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-12-11 17:03:55

在此blog中,您将找到一个关于如何将键和主题写入主题的示例:

您需要将您创建的new FlinkKafkaProducer替换为以下内容:

代码语言:javascript
复制
FlinkKafkaProducer<KafkaRecord> kafkaProducer = 
  new FlinkKafkaProducer<KafkaRecord>(
    producerTopicName, 
    ((record, timestamp) -> new ProducerRecord<byte[], byte[]>(producerTopicName, record.key.getBytes(), record.value.getBytes())), 
    producerProperties
  );
票数 1
EN

Stack Overflow用户

发布于 2020-12-11 17:46:10

如果您提供自己的KafkaSerializationSchema而不是使用SimpleStringSchema,那么您将完全控制所编写的内容。@mike在他的回答中提供了一个如何做到这一点的例子。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65248097

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档