我有一个Grails应用程序,它每天午夜运行一个作业。在我的示例应用程序中,我有10000条Person记录,并在石英作业中执行以下操作:
package threading
import static grails.async.Promises.task
import static groovyx.gpars.GParsExecutorsPool.withPool
class ComplexJob {
static triggers = {
simple repeatInterval: 30 * 1000l
}
def execute() {
if (Person.count == 5000) {
println "Executing job"
withPool 10000, {
Person.listOrderByAge(order: "asc").each { p ->
task {
log.info "Started ${p}"
Thread.sleep(15000l - (-1 * p.age))
}.onComplete {
log.info "Completed ${p}"
}
}
}
}
}
}忽略repeatInterval,因为这只是为了测试目的。当作业被执行时,我得到以下异常:
2014-11-14 16:11:51,880 quartzScheduler_Worker-3 grails.plugins.quartz.listeners.ExceptionPrinterJobListener - Exception occurred in job: Grails Job
org.quartz.JobExecutionException: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000 [See nested exception: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000]
at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:111)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
Caused by: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000
at org.grails.async.factory.gpars.LoggingPoolFactory$3.rejectedExecution(LoggingPoolFactory.groovy:100)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at groovyx.gpars.scheduler.DefaultPool.execute(DefaultPool.java:155)
at groovyx.gpars.group.PGroup.task(PGroup.java:305)
at groovyx.gpars.group.PGroup.task(PGroup.java:286)
at groovyx.gpars.dataflow.Dataflow.task(Dataflow.java:93)
at org.grails.async.factory.gpars.GparsPromise.<init>(GparsPromise.groovy:41)
at org.grails.async.factory.gpars.GparsPromiseFactory.createPromise(GparsPromiseFactory.groovy:68)
at grails.async.Promises.task(Promises.java:123)
at threading.ComplexJob$_execute_closure1_closure3.doCall(ComplexJob.groovy:20)
at threading.ComplexJob$_execute_closure1.doCall(ComplexJob.groovy:19)
at groovyx.gpars.GParsExecutorsPool$_withExistingPool_closure2.doCall(GParsExecutorsPool.groovy:192)
at groovyx.gpars.GParsExecutorsPool.withExistingPool(GParsExecutorsPool.groovy:191)
at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:162)
at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:136)
at threading.ComplexJob.execute(ComplexJob.groovy:18)
at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:104)
... 2 more
2014-11-14 16:12:06,756 Actor Thread 20 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368)
at groovyx.gpars.group.PGroup$4.run(PGroup.java:315)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2014-11-14 16:12:06,756 Actor Thread 5 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368)
at groovyx.gpars.group.PGroup$4.run(PGroup.java:315)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)当我使用withPool(10000)时,线程池似乎还没有被设置为10000,我可以用块进行计算(现在只打印日志语句)吗?如果是这样的话,我如何知道最近处理的项目是什么(例如,在哪里继续)?
发布于 2014-11-14 23:01:26
试图将每个元素的处理包装到任务似乎不是最佳的。进行并行处理的标准方法是将整个任务拆分成适当数量的子任务.从选择这个数字开始。对于CPU绑定任务,可以创建N=多处理器任务。然后将任务拆分为N个子任务。如下所示:
persons = Person.listOrderByAge(order: "asc")
nThreads = Runtime.getRuntime().availableProcessors()
size = persons.size() / nThreads
withPool nThreads, {
persons.collate(size).each { subList =>
task {
subList.each { p =>
...
}
}
}
}发布于 2014-11-15 08:31:46
我怀疑withPool()方法没有效果,因为任务很可能使用默认线程池,而不是在withPool中创建的线程池。尝试删除对withPool()的调用,并查看任务是否仍在运行。
groovyx.gpars.scheduler.DefaultPool池( GPars中的默认任务)与任务一起调整大小,并且限制为1000个并发线程。
我建议创建一个固定大小的池,例如:
def group = new DefaultPGroup(numberOfThreads)
group.task {...}注意:我不熟悉grails.async任务,只熟悉核心GPars任务,因此在grails.async中PGroups的情况可能略有不同。
https://stackoverflow.com/questions/26933177
复制相似问题