首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >闪烁CEP未打印结果

闪烁CEP未打印结果
EN

Stack Overflow用户
提问于 2016-09-19 22:45:31
回答 1查看 633关注 0票数 2

我正在尝试打印一个字符串,如果使用Flink CEP库找到Hello和world。我的来源是Kafka,并使用控制台生产者输入数据。这一部分正在发挥作用。我可以打印出我在主题中输入的内容。但是,它不会打印出我的最后一条消息“世界真好!”。它甚至不会打印出它进入了lambda。下面是类

代码语言:javascript
复制
package kafka;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.util.Map;
import java.util.Properties;

/**
 * Created by crackerman on 9/16/16.
*/
public class WordCount {

public static void main(String[] args) throws Exception {

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("zookeeper.connect", "localhost:2181");
    properties.put("group.id", "test");
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    DataStream<String> src = see.addSource(new FlinkKafkaConsumer08<>("complexString",
                                                                      new SimpleStringSchema(),
                                                                      properties));

    src.print();


    Pattern<String, String> pattern = Pattern.<String>begin("first")
            .where(evt -> evt.contains("Hello"))
            .followedBy("second")
            .where(evt -> evt.contains("World"));

    PatternStream<String> patternStream = CEP.pattern(src, pattern);

    DataStream<String> alerts = patternStream.flatSelect(
            (Map<String, String> in, Collector<String> out) -> {
                System.out.println("Made it to the lambda");
                String first = in.get("first");
                String second = in.get("second");
                System.out.println("First: " + first);
                System.out.println("Second: " + second);

                if (first.equals("Hello") && second.equals("World")) {

                    out.collect("The world is so nice!");
                }


            });

    alerts.print();

    see.execute();
}

}

任何帮助都将不胜感激。

谢谢!

EN

回答 1

Stack Overflow用户

发布于 2016-09-20 00:08:03

问题是下面这行

代码语言:javascript
复制
 see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

如果去掉它,它就会按照我预期的方式工作。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39575991

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档