首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >查找每个元组的Flink CEP检测延迟

查找每个元组的Flink CEP检测延迟
EN

Stack Overflow用户
提问于 2018-01-04 06:17:59
回答 1查看 157关注 0票数 0

我有一个简单的模式,如下所示

代码语言:javascript
复制
 Pattern<Event,?> pattern = Pattern.<Event>begin("s1")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {

                            Long time = System.nanoTime();
                            // here we are setting the time when this event is detected
                            event.setEdtl(time);

                            return event.getSensor_id() == 1 && event.getValue() > 150;
                        }
                    }).followedBy("s2")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {
                            Long time = System.nanoTime();

                            // here we are setting the time when this event is detected
                            event.setEdtl(time);

                            return event.getSensor_id() == 2 && event.getValue() > 15;
                        }
                    }).followedBy("s3")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {

                            Long time = System.nanoTime();
                            // here we are setting the time when this event is detected
                            event.setEdtl(time);
                            return event.getSensor_id() == 3 && event.getValue() > 35;
                        }
                    })
                    .within(Time.milliseconds(WindowLength_join__ms));

为了找出CEP检测时间的延迟,添加了如上所示的在模式中选择每个事件的时间。每个事件类都有一个参数Edtl (事件检测时间本地),该参数最初设置为0,稍后设置为System.nanoTime();

我在执行过程中遇到以下错误,但问题是该错误是在程序运行一段时间后出现的

代码语言:javascript
复制
    Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.cep.operator.KeyedCEPPatternOperator.emitMatchedSequences(KeyedCEPPatternOperator.java:77)
    at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:58)
    at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:236)
    at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
    ... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    ... 18 more
Caused by: java.io.IOException: Failed to send message 'patient_id=1, egtl_raw=null, edtg=null
' to socket server at localhost:6020. Connection re-tries are not enabled.
    at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:154)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    ... 26 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:143)
    at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:146)

我想我之所以选择这个模式,是因为我在模式中同时进行读写操作。如果是这样,那么我应该如何找到Flink CEP中的平均复杂事件延迟。

EN

回答 1

Stack Overflow用户

发布于 2018-01-04 07:59:47

我已经想出了解决方案,这可能不是最好的方案,但它是有效的。在上图中,我们需要找出检测到CE的时间和最小原始事件时间,然后找出这两个时间之间的差异。我在这里使用minimum的原因是,假设复杂事件中包含3个streams a, b & c,所以无论谁需要等待最多,我们都会考虑它的时间。在我看来,它可以从图案中选择。

任务1:将时间戳分配给第一个模式,如下所示

代码语言:javascript
复制
if(perform_cep ){

        // performing some cep on merged stream


        Pattern<Event,?> pattern = Pattern.<Event>begin("s1")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {

                        // here we are setting the time when this event is detected

                        if(event.getSensor_id() == 1 && event.getValue() > 150){
                            Long time = System.currentTimeMillis();
                            event.setEdtl(time);
                            return true;

                        }
                        else return false;

                    }
                }).followedBy("s2")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {

                        return  event.getSensor_id() == 2 && event.getValue() > 15 ;

                    }
                }).followedBy("s3")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {

                       return  event.getSensor_id() == 3 && event.getValue() > 35;

                    }
                })
                .within(Time.milliseconds(WindowLength_join__ms));

任务2:为复杂事件流分配时间戳并查找延迟。这是按如下方式完成的

代码语言:javascript
复制
 PatternStream<Event> patternStream = CEP.pattern(mergedStream,pattern);



            DataStream<String> cep_stream = patternStream.select(new PatternSelectFunction<Event, String>() {


  @Override
            public String select(Map<String, List<Event>> map) throws Exception {

                Event s1 = map.get("s1").get(0);

                Integer patient_id = s1.getPatient_id();
                Integer val1 = s1.getValue();
                Long time_s1 = s1.getEdtl();



                Event s2 = map.get("s2").get(0);
                Integer val2 = s2.getValue();


                Event s3 = map.get("s3").get(0);
                Integer val3 = s3.getValue();



                System.out.println("value 1  = " + val1);
                System.out.println("value 2  = " + val2);
                System.out.println("value 3  = " + val3);



                Long current_time = System.currentTimeMillis();
                Long cep_latency = current_time - time_s1 ;
                System.out.println("cep_latency = " + cep_latency + "ms" );


                String event_data =  "patient_id=" + patient_id +
                               ", cep_latency=" + cep_latency ;


                return event_data+ "\n";


            }
        });

希望这有助于让我知道是否有其他解决这个问题的方法。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48085984

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档