首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >FlinkCEP模式检测不是实时发生的

FlinkCEP模式检测不是实时发生的
EN

Stack Overflow用户
提问于 2021-04-21 05:19:50
回答 1查看 45关注 0票数 0

我仍然是Flink CEP库的新手,但我还不了解模式检测行为。考虑到下面的例子,我有一个Flink应用程序,它消耗来自kafka主题的数据,数据是定期产生的,我想使用Flink CEP模式来检测一个值何时大于给定的阈值。代码如下:

代码语言:javascript
复制
public class CEPJob{
    
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
                properties);

        consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

        DataStream<String> stream = env.addSource(consumer);

        // Process incoming data.
        DataStream<Stock> inputEventStream = stream.map(new MapFunction<String, Stock>() {

            private static final long serialVersionUID = -491668877013085114L;

            @Override
            public Stock map(String value) {
                String[] data = value.split(":");

                System.out.println("Date: " + data[0] + ", Adj Close: " + data[1]);

                Stock stock = new Stock(data[0], Double.parseDouble(data[1]));

                return stock;
            }
        });

        // Create the pattern
        Pattern<Stock, ?> myPattern = Pattern.<Stock>begin("first").where(new SimpleCondition<Stock>() {
            private static final long serialVersionUID = -6301755149429716724L;

            @Override
            public boolean filter(Stock value) throws Exception {
                return (value.getAdj_Close() > 140.0);
            }

        });

        // Create a pattern stream from our warning pattern
        PatternStream<Stock> myPatternStream = CEP.pattern(inputEventStream, myPattern);

        // Generate alert for each matched pattern
        DataStream<Stock> warnings = myPatternStream .select((Map<String, List<Stock>> pattern) -> {
            Stock first = pattern.get("first").get(0);

            return first;
        });

        warnings.print();

        env.execute("CEP job");
    }
}

当我运行作业时发生了什么,模式检测不是实时发生的,它只在第二条记录产生后才输出对当前记录检测到的模式的警告,它看起来延迟了打印到日志警告,我真的不知道如何让它在检测到模式时输出警告而不等待下一个记录,谢谢:)。

来自Kafka的数据是字符串格式:"date:value",它每5秒产生一次数据。

Java版本: 1.8,Scala版本: 2.11.12,Flink版本: 1.12.2,Kafka版本: 2.3.0

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-04-21 18:44:32

我发现的解决方案是,在Kafka主题中发送一个伪记录(例如,一个空对象),每次我生成一个值给该主题时,在Flink端(在模式声明中),我测试收到的记录是否是假的。在输出警告之前,FlinkCEP似乎总是等待即将到来的事件。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67186475

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档