1. 概述
本文主要分享 任务批处理。Eureka-Server 集群通过任务批处理同步应用实例注册实例,所以本文也是为 Eureka-Server 集群同步的分享做铺垫。
本文涉及类在 com.netflix.eureka.util.batcher 包下,涉及到主体类的类图如下( 打开大图 ):

推荐 Spring Cloud 书籍:
推荐 Spring Cloud 视频:
任务执行的整体流程如下( 打开大图 ):

acceptorQueue ),重新处理队列( reprocessQueue )。processingOrder )
* 粉线:接收线程( Runner )将重新执行队列,接收队列提交到待执行队列。workQueue )
* 粉线:接收线程( Runner )将待执行队列的任务根据参数( maxBatchingSize )将任务合并成批量任务,调度( 提交 )到工作队列。
* 黄线:执行器的工作线程池,一个工作线程可以拉取一个批量任务进行执行。com.netflix.eureka.util.batcher.TaskProcessor ,任务处理器接口。接口代码如下:
// ... 省略代码,超过微信文章上限#process(task) 方法,处理单任务。#process(tasks) 方法,处理批量任务。com.netflix.eureka.util.batcher.TaskDispatcher ,任务分发器接口。接口代码如下:
// ... 省略代码,超过微信文章上限#process(…) 方法,提交任务编号,任务,任务过期时间给任务分发器处理。com.netflix.eureka.util.batcher.TaskDispatchers ,任务分发器工厂类,用于创建任务分发器。其内部提供两种任务分发器的实现:
com.netflix.eureka.cluster.ReplicationTaskProcessor ,实现 TaskDispatcher ,Eureka-Server 集群任务处理器。感兴趣的同学,可以点击链接自己研究,我们将在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。
调用 TaskDispatchers#createBatchingTaskDispatcher(...) 方法,创建批量任务执行的分发器,实现代码如下:
// TaskDispatchers.java
1: /**
2: * 创建批量任务执行的分发器
3: *
4: * @param id 任务执行器编号
5: * @param maxBufferSize 待执行队列最大数量
6: * @param workloadSize 单个批量任务包含任务最大数量
7: * @param workerCount 任务执行器工作线程数
8: * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
9: * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
10: * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
11: * @param taskProcessor 任务处理器
12: * @param <ID> 任务编号泛型
13: * @param <T> 任务泛型
14: * @return 批量任务执行的分发器
15: */
// ... 省略代码,超过微信文章上限调用 TaskDispatchers#createNonBatchingTaskDispatcher(...) 方法,创建单任务执行的分发器,实现代码如下:
1: /**
2: * 创建单任务执行的分发器
3: *
4: * @param id 任务执行器编号
5: * @param maxBufferSize 待执行队列最大数量
6: * @param workerCount 任务执行器工作线程数
7: * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
8: * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
9: * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
10: * @param taskProcessor 任务处理器
11: * @param <ID> 任务编号泛型
12: * @param <T> 任务泛型
13: * @return 单任务执行的分发器
14: */
15: public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id,
16: int maxBufferSize,
17: int workerCount,
18: long maxBatchingDelay,
19: long congestionRetryDelayMs,
20: long networkFailureRetryMs,
21: TaskProcessor<T> taskProcessor) {
22: // 创建 任务接收执行器
23: final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
24: id, maxBufferSize, /* workloadSize = 1 */1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
25: );
26: final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
27: return new TaskDispatcher<ID, T>() {
28: @Override
29: public void process(ID id, T task, long expiryTime) {
30: acceptorExecutor.process(id, task, expiryTime);
31: }
32:
33: @Override
34: public void shutdown() {
35: acceptorExecutor.shutdown();
36: taskExecutor.shutdown();
37: }
38: };
39: }#createBatchingTaskDispatcher(…) 只差 workloadSize = 1 参数。在 「5. 创建任务接收器」 详细解析。#createBatchingTaskDispatcher(…) 一样。com.netflix.eureka.util.batcher.AcceptorExecutor ,任务接收执行器。创建构造方法代码如下:
// ... 省略代码,超过微信文章上限com.netflix.eureka.util.batcher.TaskExecutors ,任务执行器。其内部提供创建单任务和批量任务执行器的两种方法。TaskExecutors 构造方法如下:
// ... 省略代码,超过微信文章上限workerThreads 属性,工作线程池。工作任务队列会被工作线程池并发拉取,并发执行。com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnableFactory ,创建工作线程工厂接口。单任务和批量任务执行器的工作线程实现不同,通过自定义工厂实现类创建。调用 TaskExecutors#batchExecutors(...) 方法,创建批量任务执行器。实现代码如下:
/**
* 创建批量任务执行器
*
* @param name 任务执行器名
* @param workerCount 任务执行器工作线程数
* @param processor 任务处理器
* @param acceptorExecutor 接收任务执行器
* @param <ID> 任务编号泛型
* @param <T> 任务泛型
* @return 批量任务执行器
*/
// ... 省略代码,超过微信文章上限com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.BatchWorkerRunnable ,批量任务工作线程。调用 TaskExecutors#singleItemExecutors(...) 方法,创建批量任务执行器。实现代码如下:
/**
* 创建单任务执行器
*
* @param name 任务执行器名
* @param workerCount 任务执行器工作线程数
* @param processor 任务处理器
* @param acceptorExecutor 接收任务执行器
* @param <ID> 任务编号泛型
* @param <T> 任务泛型
* @return 单任务执行器
*/
// ... 省略代码,超过微信文章上限com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.SingleTaskWorkerRunnable ,单任务工作线程。com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable ,任务工作线程抽象类。BatchWorkerRunnable 和 SingleTaskWorkerRunnable 都实现该类,差异在 #run() 的自定义实现。WorkerRunnable 实现代码如下:
// ... 省略代码,超过微信文章上限com.netflix.eureka.util.batcher.TrafficShaper ,网络通信整形器。当任务执行发生请求限流,或是请求网络失败的情况,则延时 AcceptorRunner 将任务提交到工作任务队列,从而避免任务很快去执行,再次发生上述情况。TrafficShaper 实现代码如下:
// ... 省略代码,超过微信文章上限#registerFailure(…) ,在任务执行失败时,提交任务结果给 TrafficShaper ,记录发生时间。在 「10. 任务执行器【执行任务】」 会看到调用该方法。#transmissionDelay(…) ,计算提交延迟,单位:毫秒。「9. 任务接收线程【调度任务】」 会看到调用该方法。调用 AcceptorExecutor#process(...) 方法,添加任务到接收任务队列。实现代码如下:
// AcceptorExecutor.java
// ... 省略代码,超过微信文章上限com.netflix.eureka.util.batcher.TaskHolder ,任务持有者,实现代码如下:
// ... 省略代码,超过微信文章上限后台线程执行 AcceptorRunner#run(...) 方法,调度任务。实现代码如下:
// ... 省略代码,超过微信文章上限#drainInputQueues() 方法,循环处理完输入队列( 接收队列 + 重新执行队列 ),直到有待执行的任务。实现代码如下:
// ... 省略代码,超过微信文章上限pendingTasks ) 里。processingOrder ) 的头部。效果如下图:

pendingTasks )已满,清空重新执行队列( processingOrder ),放弃较早的任务。reprocessQueue ) 和接收队列( acceptorQueue )为空pendingTasks )不为空reprocessQueue )。实现代码如下:
// ... 省略代码,超过微信文章上限acceptorQueue ),实现代码如下:
// ... 省略代码,超过微信文章上限acceptorQueue ) 拉取任务 10 ms。若拉取到,添加到待执行队列( processingOrder )。scheduleTime )。scheduleTime 小于当前时间,不重新计算,即此时需要延迟等待调度。scheduleTime 大于等于当前时间,配合 TrafficShaper#transmissionDelay(…) 重新计算。scheduleTime 小于当前时间,执行任务的调度。#assignBatchWork() 方法,调度批量任务。实现代码如下:
// ... 省略代码,超过微信文章上限#hasEnoughTasksForNextBatch() 方法,判断是否有足够任务进行下一次批量任务调度:1)待执行任务( processingOrder )映射已满;或者 2)到达批量任务处理最大等待延迟。实现代码如下:
// ... 省略代码,超过微信文章上限holders )。? 你会发现,本文说了半天的批量任务,实际是 List<taskholder></taskholder哈。batchWorkRequests ) 。在任务执行器的批量任务执行器,每次执行时,发出 batchWorkRequests 。每一个信号量需要保证获取到一个批量任务。#assignSingleItemWork() 方法,调度单任务。#assignSingleItemWork() 方法,调度单任务,和 #assignBatchWork() 方法类似。实现代码如下:
// ... 省略代码,超过微信文章上限totalItems )等于当前待执行队列( processingOrder )的任务数,意味着:1)任务执行器无任务请求,正在忙碌处理之前的任务;或者 2)任务延迟调度。睡眠 10 秒,避免资源浪费。批量任务工作后台线程( BatchWorkerRunnable )执行 #run(...) 方法,调度任务。实现代码如下:
//
// ... 省略代码,超过微信文章上限getWork() 方法,获取一个批量任务直到成功。实现代码如下:
// ... 省略代码,超过微信文章上限batchWorkQueue ) 和单任务工作队列( singleItemWorkQueue ) 是不同的队列。TaskDispatcher#requestWorkItems() 方法,发起请求信号量,并获得批量任务的工作队列。实现代码如下:
// TaskDispatcher.java // ... 省略代码,超过微信文章上限#getTasksOf(...) 方法,获得实际批量任务。实现代码如下:
// ... 省略代码,超过微信文章上限Congestion 或 TransientError ,调用 AcceptorExecutor#reprocess(...) 提交整个批量任务重新处理,实现代码如下:
// AcceptorExecutor.java // ... 省略代码,超过微信文章上限单任务工作后台线程( SingleTaskWorkerRunnable )执行 #run(...) 方法,调度任务,和 BatchWorkerRunnable#run(...) 基本类似,就不啰嗦了。实现代码如下:
@Override
// SingleTaskWorkerRunnable.java
// ... 省略代码,超过微信文章上限