我是Flink Stream processing的新手,需要一些Flink Kafka producer的帮助,因为经过一段时间的搜索后,找不到太多相关的东西。我目前正在从Kafka主题读取流,然后在执行一些计算后,我想将此写入Kafka中的一个新的独立主题。但我面临的问题是,我不能发送Kafka主题的密钥。我使用的是Flink Kafka连接器,它给了我FlinkKafkaConsumer和FlinkKafkaProducer。下面是我的代码,下面是我的代码,我可以在我的代码中更改什么,它可以工作,目前在Kafka上,我正在生成我的消息是使用null in Key,其中的值是我需要的:
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.servers", serverURL);
consumerProperties.setProperty("group.id", groupID);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(consumerTopicName,
new SimpleStringSchema(), consumerProperties);
kafkaConsumer.setStartFromEarliest();
DataStream<String> kafkaConsumerStream = env.addSource(kafkaConsumer);
final int[] tVoteCount = {0};
DataStream<String> kafkaProducerStream = kafkaConsumerStream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws InterruptedException, IOException {
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
Tcount = Tcount + jsonNode.get(key1).asInt();
int nameCandidate = jsonNode.get(key2).asInt();
System.out.println(Tcount);
String tCountT = Integer.toString(Tcount);
//tVoteCount = tVoteCount + voteCount;
//waitForEventTime(timeStamp);
return tCountT;
}
});
kafkaConsumerStream.print();
System.out.println("sdjknvksjdnv"+Tcount);
Properties producerProperties = new Properties();
producerProperties.setProperty("bootstrap.servers", serverURL);
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(producerTopicName,
new SimpleStringSchema(), producerProperties);
kafkaProducerStream.addSink(kafkaProducer);
env.execute();谢谢。
发布于 2020-12-11 17:03:55
在此blog中,您将找到一个关于如何将键和主题写入主题的示例:
您需要将您创建的new FlinkKafkaProducer替换为以下内容:
FlinkKafkaProducer<KafkaRecord> kafkaProducer =
new FlinkKafkaProducer<KafkaRecord>(
producerTopicName,
((record, timestamp) -> new ProducerRecord<byte[], byte[]>(producerTopicName, record.key.getBytes(), record.value.getBytes())),
producerProperties
);发布于 2020-12-11 17:46:10
如果您提供自己的KafkaSerializationSchema而不是使用SimpleStringSchema,那么您将完全控制所编写的内容。@mike在他的回答中提供了一个如何做到这一点的例子。
https://stackoverflow.com/questions/65248097
复制相似问题