我正在使用ColdFusion网关来触发和忘记大量的操作。要做到这一点,我有一个循环,在最后使用一个SendGatewayMessage()进行查询。但是,我循环的查询可能会变得非常大。(100.000+记录)
为了防止操作丢失,我增加了队列大小和线程数。
由于操作仍然丢失,所以在SendGatewayMessage()之前包括了一个循环,如下所示:
<cfloop condition="#gatewayService.getQueueSize()# GTE #gatewayService.getMaxQueueSize()#">
<cfset guardianCount = guardianCount+1>
</cfloop>
<cflog file="gatewayGuardian" text="#i# waited for #guardianCount# iterations. Queuesize:#gatewayService.getQueueSize()#">
<cfset SendGatewayMessage("EventGateway",eventData)>(关于gatewayService类here的更多信息)
这或多或少是可以接受的,因为我可以将请求超时增加到几个小时(!),但我仍然在寻找一种更有效的方法来减缓消息发送到队列的速度,希望整个进程运行得更快,同时减少服务器资源的压力。
有什么建议吗?对于进一步增加排队人数的后果,有何想法?
发布于 2016-11-17 16:05:09
现在,我使用应用程序变量来跟踪整个作业中的记录、已处理的批数和已处理的记录数。在作业开始时,我有一段代码来启动以下所有变量:
<cfif not structKeyExists(application,"batchNumber") or application.batchNumber
eq 0 or application.batchNumber eq "">
<cfset application.batchNumber = 0>
<cfset application.recordsToDo = 0>
<cfset application.recordsDone = 0>
<cfset application.recordsDoneErrors = 0>
</cfif>之后,我在一个查询中设置所有记录,并确定我们需要在当前批处理中处理的查询中的哪些记录。批处理中的记录量由记录总数和最大队列大小决定。这样,每一批将不会占用超过一半的队列。这可以确保作业不会干扰其他操作或作业,并且初始请求不会超时。
<cfset application.recordsToSync = qryRecords.recordcount>
<cfif not structKeyExists(application,"recordsPerBatch") or application.recordsPerBatch eq "" or application.recordsPerBatch eq 0>
<cfset application.recordsPerBatch = ceiling(application.recordsToDo/(ceiling(application.recordsToDo/gatewayService.getMaxQueueSize())+1))>
</cfif>
<cfset startRow = (application.recordsPerBatch*application.batchNumber)+1>
<cfset endRow = startRow + application.recordsPerBatch-1>
<cfif endRow gt application.recordsToDo>
<cfset endRow = application.recordsToDo>
</cfif>然后,我使用from/to循环循环查询,以触发网关事件。我保留了守护者,这样就不会有记录丢失,因为排队的人已经排满了。
<cfloop from="#startRow#" to="#endRow#" index="i">
<cfset guardianCount = 0>
<!--- load all values from the record into a struct --->
<cfset stRecordData = structNew()>
<cfloop list="#qryRecords.columnlist#" index="columnlabel">
<cfset stRecordData[columnlabel] = trim(qryRecords[columnlabel][i])>
</cfloop>
<cfset eventData = structNew()>
<cfset eventData.stData = stRecordData>
<cfset eventData.action = "bigJob">
<cfloop condition="#gatewayService.getQueueSize()# GTE #gatewayService.getMaxQueueSize()#">
<cfset guardianCount = guardianCount++>
</cfloop>
<cfset SendGatewayMessage("eventGateway",eventData)>
</cfloop>每当记录完成时,我都有一个函数来检查已完成的数量和要执行的记录的数量。当他们一样的时候,我就完了。否则,我们可能需要启动一个新的批。注意,查看我们是否完成的检查是在一个集群中进行的,但是实际的事件post不是。这是因为,否则,当您发布的事件无法读取您在锁中使用的变量时,可能会出现死锁。
我希望这是有用的人或其他人有一个更好的想法。
<cflock timeout="30" name="jobResult">
<cfset application.recordsDone++>
<cfif application.recordsDone eq application.recordsToDo>
<!--- We are done. Set all the application variables we used back to zero, so they do not get in the way when we start the job again --->
<cfset application.batchNumber = 0>
<cfset application.recordsToDo = 0>
<cfset application.recordsDone = 0>
<cfset application.recordsPerBatch = 0>
<cfset application.recordsDoneErrors = 0>
<cfset application.JobStarted = 0>
<!--- If the number of records we have done is the same as the number of records in a batch times the current batchnumber plus one, we are done with the batch. --->
<cfelseif application.recordsDone eq application.recordsPerBatch*(application.batchNumber+1)
and application.recordsDone neq application.recordsToDo>
<cfset application.batchNumber++>
<cfset doEventAnnounce = true>
</cfif>
</cflock>
<cfif doEventAnnounce>
<!--- Fire off the event that starts the job. All the info it needs is in the applicationscope. --->
<cfhttp url="#URURLHERE#/index.cfm" method="post">
<cfhttpparam type="url" name="event" value="startBigJob">
</cfhttp>
</cfif>https://stackoverflow.com/questions/40397509
复制相似问题