首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >FLINK CEP (Java 8) -通过匹配模式持久的“身份”

FLINK CEP (Java 8) -通过匹配模式持久的“身份”
EN

Stack Overflow用户
提问于 2019-05-28 19:02:48
回答 1查看 142关注 0票数 0

我试图使用FLINK-CEP来衡量一个市场上从BidState.OPENBidState.Closed的投标时间。我收到了一个DataStream的出价与ID和州,和它的立场,我是匹配所有的“开放”出价与所有“关闭”出价。

我有一个条件,在patternStream.process,它只允许开放和结束出价与相同的ID成对,因为他们应该是。但这感觉不对,因为比赛的数量以这种方式以惊人的速度增长,我有一种感觉,这可以用模式来完成。因此,是否有一种方法可以确保"start“和"end”对象具有相同的ID?

代码语言:javascript
复制
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
//Is it possible to make sure that start.BidID == end.BidID in the pattern?
Pattern<BidEvent, ?> pattern = Pattern.<BidEvent>begin("start", skipStrategy).where(
        new SimpleCondition<BidEvent>() {
            @Override
            public boolean filter(BidEvent value) {
                return value.getState() == BidState.OPENED;
            }
        }).followedByAny("end").where(
        new SimpleCondition<BidEvent>() {
            @Override
            public boolean filter(BidEvent value) throws Exception {
                return value.getState() == BidState.CLOSED; // && value.getBidID == start.getBidID?
            }
        }).within(timeout);

PatternStream<BidEvent> patternStream = CEP.pattern(BidEventDataStream, pattern);

patternStream.process(new PatternProcessFunction<BidEvent, MatchingDuration>() {
    @Override
    public void processMatch(Map<String
            , List<BidEvent>> map
            , Context context
            , Collector<MatchingDuration> collector) {

        BidEvent start = map.get("start").get(0);
        BidEvent end = map.get("end").get(0);
        if (start.getBidId() == end.getBidId()){ // Make sure opening and closing bid is the same. Can this be done in the pattern?
            collector.collect(new MatchingDuration(start.getBidId(), (end.getTimestamp() - start.getTimestamp())));
        }
    }
}).addSink(matchingDurationSinkFunction);
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-05-29 08:41:07

我想出了如何获得我想要的行为:必须对BidEventDataStream 进行键控,以便在具有相同键的对象上进行模式匹配。问题中的代码不需要更改,但是必须编辑BidEventDataStream才能捕获BidEvent.getBidId()

代码语言:javascript
复制
BidEventDataStream.keyBy(new KeySelector<BidEvent, Long>() {
                    @Override
                    public Long getKey(BidEventvalue) {
                        return value.getBidId();
                    }
                })
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56348581

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档