首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >同时处理两个DataStream<String>,以便在flink中找到一个DataStream包含来自其他DataStream的值?

同时处理两个DataStream<String>,以便在flink中找到一个DataStream包含来自其他DataStream的值?
EN

Stack Overflow用户
提问于 2020-12-09 01:17:18
回答 1查看 76关注 0票数 0

假设我有两个DataStream<String>,我从Kafka收到了流,经过一些处理,我得到了这两个流。

代码语言:javascript
复制
DataStream<String> A contains values {id1_id2 , id3_id4, id99_id0, id15_id3,id11_id5....}

DataStream<String> B contains values {id2, id3,id5...}

是否可以对DataStream A进行一些处理,以便它将值输出到另一个

代码语言:javascript
复制
DataStream<String> C ={id1, id3, id15, id11}

因此,B中存在的所有值都将与A相交。我尝试过使用processElement()和RichCoFlatMapFunction,但都不起作用。

代码语言:javascript
复制
public class MatchAggregator
        extends RichCoFlatMapFunction<String, String, Tuple1<String>> {

    private ValueState<String> doubleState;
    private ValueState<String> singleState;

    @Override
    public void open(Configuration config) {

        doubleState = getRuntimeContext().getState(new ValueStateDescriptor<>("doubleEvents",String.class));
        singleState = getRuntimeContext().getState(new ValueStateDescriptor<>("singleEvents",String.class));
    }
    
    @Override
    public void flatMap1(String s, Collector<Tuple1<String>> collector) throws Exception {
        String single = singleState.value();
       //this is outputting null.
        System.out.println(single);
      //s is also null
        if(single.contains(s)){
            String replaceNumber = single.replace(s,"");
            String replaceEmp = replaceNumber.replace("_","");
            single.clear();
            collector.collect(Tuple1.of(replaceEmp));
        }else {
            personContactState.update(s);
        }
    }

    @Override
    public void flatMap2(String s, Collector<Tuple1<String>> collector) throws Exception {

        
    }
}

我使用两个DataStreams,如下所示:

代码语言:javascript
复制
DataStream<Tuple1<String>> match = A.connect(B).flatMap(new MatchAggregator());

match.print();
EN

回答 1

Stack Overflow用户

发布于 2020-12-10 19:04:24

RichCoFlatMapFunction的确切行为将取决于您对两个连接的流设置关键帧的方式。

String single = singleState.value()将检索与传入String s的键相同的键之前存储的任何值。在您共享的代码中,singleState从未调用过update,因此singleState.value()将始终为null。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65203692

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档