我们正在将许多批处理作业块批处理转换为远程块批处理,因为我们需要能够在一个定义的时间框架内处理多达100万块块。我们已经在运行作业,但是我们有一些与性能相关的问题。
我们的工作每次输入/块处理时间相当长,通常只有几秒钟。
我们在从属端使用以下配置:
<bean id="ourTaskExecutor" class="...ThreadPoolTaskExecutor">
<property name="corePoolSize" value="16" />
<property name="maxPoolSize" value="16" />
<property name="queueCapacity" value="256" />
</bean>
<int-jms:inbound-channel-adapter
id="JobJmsRequests"
connection-factory="jmsConnectionFactory"
channel="requests.chunking"
destination-name="requests.chunking" >
<int:poller task-executor="ourTaskExecutor"
fixed-delay="50"
max-messages-per-poll="4"
receive-timeout="50"/>
</int-jms:inbound-channel-adapter>
<int-jms:outbound-channel-adapter
id="JobJmsReplies"
connection-factory="jmsConnectionFactory"
destination-name="replies.chunking"
channel="replies.chunking"/>
<int:service-activator
id="JobActivator"
input-channel="requests.chunking"
output-channel="replies.chunking"
ref="JobChunkHandler" method="handleChunk"/> 我们已经将主节点设置为64块的节流,因此任何时候都不应该处理超过64个块。
我们的假设是,轮询程序将接收到的消息交给任务执行器,以便对其进行处理,但在任务执行器中只可能有超过64条消息,任务执行器有容纳256+16消息的空间。
然而,我们看到的异常情况是队列容量已经达到,任务被任务执行器拒绝。我们认为这是因为任务执行器仅由轮询程序使用,并在轮询线程上处理所获得的请求。由于处理时间很长,我们缓慢地用轮询任务淹没任务执行器。并在一个线程中处理接收到的所有信息)。
我们无法完全解释洪水的原因,因为我们使用了接收超时和延迟,因此我们的理解是,在触发新的投票之前,应该先进行处理。然而,情况似乎并非如此。
我们基本上要配置的是以下内容:
只应同时处理16条消息。附加消息将被排队(使用相当小的队列,比如池的大小)。当池已满时,我们希望轮询程序被阻塞,直到线程池中的空间再次可用为止。我们不想让一个奴隶急于去做太多的工作。
我们希望在不同的线程上对消息进行轮询和执行。这是可能的吗?如何做到这一点?
发布于 2017-11-03 16:21:30
使用调用方运行或调用方阻止RejectedExecutionHandler。
也就是说,你不想要内存块,它们将在系统故障中丢失。
您应该使用消息驱动的入站通道适配器,并将并发设置为要处理的并发块数。
https://stackoverflow.com/questions/47099651
复制相似问题