MailboxProcessor MailboxProcessor 可以认为是 Mailbox 相关的核心入口,MailboxProcessor 的核心方法就是事件循环,这个循环中主要是从 TaskMailbox ,主要用于 isIdle 方法 private final MailboxProcessor mailboxProcessor; MailboxExecutor 的主要作用是向 TaskMailbox new TaskMailboxImpl(Thread.currentThread())); ... this.mailboxProcessor = new MailboxProcessor 传入 MailboxProcessor。 在 StreamTask 启动时,会调用 MailboxProcessor 的核心方法。
MailboxProcessor还对外提供了MailboxExecutor,其他组件可以利用MailboxExecutor来提交事件。 执行流程主流程在创建StreamTask时,会创建mailboxProcessor,同时也会持有mainMailboxExecutor。 ();可以看到这里将processInput作为MailboxDefaultAction传入MailboxProcessor。 在StreamTask启动时,会调用MailboxProcessor的核心方法。 接着创建了MailboxController,它用于MailboxDefaultAction与MailboxProcessor的交互。
进行反序列化生成相应的StreamTask,而在实例化StreamTaskl的过程中,将当前线程传递给了StreamTask,并进一步传递了TaskMailbox,并调用StreamTask# invoke,执行MailboxProcessor 又用这个mailbox构造了两个同样关键的实例:mailboxProcessor、mainMailboxExecutor。 StreamTask# invoke后,接着再来看看invoke: 很容易的看出, 核心的执行方法是runMailboxLoop, 而StreamTask# runMailboxLoop调用了MailboxProcessor # runMailboxLoop,所以我们来到MailboxProcessor的runMailboxLoop一探究竟, MailboxProcessor 我们最关心的依然是mailbox的传递,这个mailbox 成员便是我们刚刚在StreamTask的构造方法里传递给MailboxProcessor的传参,接着继续看调用方法, 所以,任务线程是在一个循环中,不断的从Mailbox中取出Mail,然后执行,这也和我们在
(); } 我们先来看下在StreamTask的构造方法中对mailboxProcessor的定义: // 创建 mailboxProcessor this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor); 第一个入参为MailboxDefaultAction,第二个入参为一个 接下来我们来看org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop方法: /** 的 default action,也就是调用 processInput() // 这里的defaultAction是在StreamTask的构造方法中的this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor)中的this::processInput。
Mail.java:90) ~[flink-dist_2.11-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking (MailboxProcessor.java:353) ~[flink-dist_2.11-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail (MailboxProcessor.java:317) ~[flink-dist_2.11-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop (MailboxProcessor.java:201) ~[flink-dist_2.11-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep (MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop (MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:424)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop (MailboxProcessor.java:204)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java
最后真正执行的是 MailboxProcessor 中的 runMailboxLoop() 方法,也就是上面说的 MailBox 主线程,StreamTask 运行的核心流程也是在这个方法中,其实现如下
operator 的 open 方法 * +----> run() //runMailboxLoop()方法将一直运行,直到没有更多的输入数据 * --------> mailboxProcessor.runMailboxLoop final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter; protected final MailboxProcessor mailboxProcessor; private Long syncSavepointId = null; @Override public final void invoke() CancelTaskException(); } // let the task do its work isRunning = true; runMailboxLoop(); //MailboxProcessor.runMailboxLoop
-1, 354713989 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$710) runMailboxLoop:187, MailboxProcessor 1, 1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713) runMailboxLoop:187, MailboxProcessor 1, 1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713) runMailboxLoop:187, MailboxProcessor :-1, 33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718) runMailboxLoop:187, MailboxProcessor :-1, 33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718) runMailboxLoop:187, MailboxProcessor
我们解决了首次调用 MailboxProcessor.TryReceive 时 CPU 使用率较高的问题。 bool 比较现在使用快速泛型比较(由 Vasily Kirichenko 提供)。