首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么Flink CEP不匹配模式?

为什么Flink CEP不匹配模式?
EN

Stack Overflow用户
提问于 2022-10-08 10:04:56
回答 1查看 36关注 0票数 0

弗林克新手来了。

我有一个本地设置,其中Flink处理由Debezium生成的JSON事件。这样做的目的是监视客户支持代理和客户之间的对话。如果客户支持代理没有在特定的时间范围内回复消息,我想发出警告。

我的Flink CEP代码如下所示:

代码语言:javascript
复制
    // We create a watermark strategy
    WatermarkStrategy<MessageEvent> watermarkStrategy = WatermarkStrategy
            .<MessageEvent>forMonotonousTimestamps()
            .withTimestampAssigner((event, ts) -> event.getPayload().getAfter().getCreatedAt().longValue());

    // We get our inputs here
    KafkaSource<String> msgTopic = Sources.kafkaSource("message", "flink");
    DataStream<String> input = env.fromSource(msgTopic, WatermarkStrategy.noWatermarks(), "Message Source");

    // Convert the Debezium payload into MessageEvent and assign watermarks
    // Partition by thread ID
    DataStream<MessageEvent> msgStream = input.map(new MessageMapper())
            .assignTimestampsAndWatermarks(watermarkStrategy)
            .keyBy(msg -> msg.getPayload().getAfter().getThreadId());

    Pattern<MessageEvent, ?> pattern = Pattern.<MessageEvent>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
                    .where(new MessageEventDirectionFilter("INCOMING"))
                    .oneOrMore()
                    .greedy()
                    .notNext("end")
                    .where(new MessageEventDirectionFilter("OUTGOING"))
                    .within(Time.seconds(10));

    CEP.pattern(msgStream, pattern)
            .inEventTime()
            .process(new IdleConversationProcessFunction())
            .print();

INCOMING消息是由客户发送的消息,因为OUTGOING消息是由支持代理发送的消息。这就是我想要捕捉的东西。我可以看到记录被发送到STDOUT操作符,但是任务管理器日志中没有显示任何记录。

为什么定义的模式不匹配?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-08 12:48:28

PrintSink不写入任务管理器日志。相反,它将写入位于日志目录中的单独文件(名称以.out结尾)。可能是模式匹配,但你忽略了输出?

顺便说一句,您应该忽略这样一个事实: web显示从打印接收器发送到STDOUT的0条记录。显示发送(和接收)的记录(和字节)的度量标准仅包括通过Flink的内部网络堆栈的流量。这些指标中不包括与外部系统(源和接收器)的连接。

但是,我怀疑notNext是否支持以这种方式使用,所以如果真的没有输出,我也不会感到惊讶。

如果您定义模式以匹配答复在时间上确实发生的情况,然后使用processTimedOutMatch方法捕获您真正想要的情况(即超时时),那么就可以工作了。

有关更多详细信息,请参阅处理超时部分模式

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73996083

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档