弗林克新手来了。
我有一个本地设置,其中Flink处理由Debezium生成的JSON事件。这样做的目的是监视客户支持代理和客户之间的对话。如果客户支持代理没有在特定的时间范围内回复消息,我想发出警告。
我的Flink CEP代码如下所示:
// We create a watermark strategy
WatermarkStrategy<MessageEvent> watermarkStrategy = WatermarkStrategy
.<MessageEvent>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> event.getPayload().getAfter().getCreatedAt().longValue());
// We get our inputs here
KafkaSource<String> msgTopic = Sources.kafkaSource("message", "flink");
DataStream<String> input = env.fromSource(msgTopic, WatermarkStrategy.noWatermarks(), "Message Source");
// Convert the Debezium payload into MessageEvent and assign watermarks
// Partition by thread ID
DataStream<MessageEvent> msgStream = input.map(new MessageMapper())
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(msg -> msg.getPayload().getAfter().getThreadId());
Pattern<MessageEvent, ?> pattern = Pattern.<MessageEvent>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
.where(new MessageEventDirectionFilter("INCOMING"))
.oneOrMore()
.greedy()
.notNext("end")
.where(new MessageEventDirectionFilter("OUTGOING"))
.within(Time.seconds(10));
CEP.pattern(msgStream, pattern)
.inEventTime()
.process(new IdleConversationProcessFunction())
.print();INCOMING消息是由客户发送的消息,因为OUTGOING消息是由支持代理发送的消息。这就是我想要捕捉的东西。我可以看到记录被发送到STDOUT操作符,但是任务管理器日志中没有显示任何记录。

为什么定义的模式不匹配?
发布于 2022-10-08 12:48:28
PrintSink不写入任务管理器日志。相反,它将写入位于日志目录中的单独文件(名称以.out结尾)。可能是模式匹配,但你忽略了输出?
顺便说一句,您应该忽略这样一个事实: web显示从打印接收器发送到STDOUT的0条记录。显示发送(和接收)的记录(和字节)的度量标准仅包括通过Flink的内部网络堆栈的流量。这些指标中不包括与外部系统(源和接收器)的连接。
但是,我怀疑notNext是否支持以这种方式使用,所以如果真的没有输出,我也不会感到惊讶。
如果您定义模式以匹配答复在时间上确实发生的情况,然后使用processTimedOutMatch方法捕获您真正想要的情况(即超时时),那么就可以工作了。
有关更多详细信息,请参阅处理超时部分模式。
https://stackoverflow.com/questions/73996083
复制相似问题