假设我有两个DataStream<String>,我从Kafka收到了流,经过一些处理,我得到了这两个流。
DataStream<String> A contains values {id1_id2 , id3_id4, id99_id0, id15_id3,id11_id5....}
DataStream<String> B contains values {id2, id3,id5...}是否可以对DataStream A进行一些处理,以便它将值输出到另一个
DataStream<String> C ={id1, id3, id15, id11}因此,B中存在的所有值都将与A相交。我尝试过使用processElement()和RichCoFlatMapFunction,但都不起作用。
public class MatchAggregator
extends RichCoFlatMapFunction<String, String, Tuple1<String>> {
private ValueState<String> doubleState;
private ValueState<String> singleState;
@Override
public void open(Configuration config) {
doubleState = getRuntimeContext().getState(new ValueStateDescriptor<>("doubleEvents",String.class));
singleState = getRuntimeContext().getState(new ValueStateDescriptor<>("singleEvents",String.class));
}
@Override
public void flatMap1(String s, Collector<Tuple1<String>> collector) throws Exception {
String single = singleState.value();
//this is outputting null.
System.out.println(single);
//s is also null
if(single.contains(s)){
String replaceNumber = single.replace(s,"");
String replaceEmp = replaceNumber.replace("_","");
single.clear();
collector.collect(Tuple1.of(replaceEmp));
}else {
personContactState.update(s);
}
}
@Override
public void flatMap2(String s, Collector<Tuple1<String>> collector) throws Exception {
}
}我使用两个DataStreams,如下所示:
DataStream<Tuple1<String>> match = A.connect(B).flatMap(new MatchAggregator());
match.print();发布于 2020-12-10 19:04:24
RichCoFlatMapFunction的确切行为将取决于您对两个连接的流设置关键帧的方式。
String single = singleState.value()将检索与传入String s的键相同的键之前存储的任何值。在您共享的代码中,singleState从未调用过update,因此singleState.value()将始终为null。
https://stackoverflow.com/questions/65203692
复制相似问题