问题:我们有两个或三个应用程序实例。每个实例都有一个生产者和一个使用者。我们必须安排一些进程,为此我们使用了通用的spring调度程序。此调度程序生成消息并将其抛给"Broker“(RabbitMQ)。在本例中,我们处理相同的数据2或3次,因为每个实例都抛出消息。在第一个生成器抛出消息之前,如何阻止实例的生成器?
配置:
<!-- RabbitMQ configuration -->
<rabbit:connection-factory
id="connection" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
channel-cache-size="${rabbit.publisherCacheSize}" virtual-host="${rabbit.virtualHost}" />
<!-- Declare executor pool for worker threads -->
<!-- Ensure that the pool-size is greater than the sum of all number of concurrent consumers from rabbit that use this pool to ensure
you have enough threads for maximum concurrency. We do this by ensuring that this is 1 plus the size of the connection factory cache
size for all consumers -->
<task:executor id="worker-pool" keep-alive="60" pool-size="${rabbit.consumerChannelCacheSize}" queue-capacity="1000" rejection-policy="CALLER_RUNS"/>
<!-- Message converter -->
<bean id="baseMessageConverter" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound" value="com.company.model.Scraper"/>
</bean>
<bean id="messageConverter" class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg index="0" ref="baseMessageConverter"/>
</bean>
<!-- *********************************producer*********************************** -->
<!-- Outbound company Events -->
<int:channel id="producerChannelCompany"/>
<int:gateway id="jobcompanyCompleteEventGateway" service-interface="com.company.eventing.companyEventPublisher"
default-request-channel="producerChannelCompany"
default-request-timeout="2000"
error-channel="errors"/>
<amqp:outbound-channel-adapter id="companyEvents.amqpAdapter" channel="producerChannelCompany"
exchange-name="${rabbit.queue.topic}"
routing-key="${rabbit.queue.routing.key}"
amqp-template="psRabbitTemplate"/>
<rabbit:admin id="psRabbitAdmin" connection-factory="connection" />
<rabbit:template id="psRabbitTemplate" channel-transacted="${rabbit.channelTransacted}" encoding="UTF-8" message-converter="messageConverter" connection-factory="connection"/>
<rabbit:topic-exchange id="ps.topic" name="${rabbit.queue.topic}" durable="true" auto-delete="false"/>
<!-- *********************************consumer*********************************** -->
<rabbit:queue id="ps.queue" name="${rabbit.queue}" auto-delete="false" durable="true" exclusive="false" />
<!-- Exchange to queue binding -->
<rabbit:topic-exchange id="ps.topic" name="${rabbit.queue.topic}" durable="true" auto-delete="false" >
<rabbit:bindings>
<rabbit:binding queue="${rabbit.queue}" pattern="${rabbit.queue.pattern}"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- Configuration for consuming company Complete events -->
<amqp:inbound-channel-adapter id="companyAdapter"
channel="companyCompleteEventChannel"
queue-names="${rabbit.queue}"
channel-transacted="${rabbit.channelTransacted}"
prefetch-count="${rabbit.prefetchCount}"
concurrent-consumers="${rabbit.concurrentConsumers}"
connection-factory="connection"
message-converter="messageConverter"
task-executor="worker-pool"
error-channel="errors"/>
<int:channel id="companyCompleteEventChannel"/>
<int:service-activator id="companyCompleteActivator" input-channel="companyCompleteEventChannel"
ref="companyEventHandler" method="runScraper"/>
<bean id="jvmLauncher" class="com.app.company.jvm.JvmLauncher" />
<!-- company Event handler -->
<bean id="companyEventHandler" class="com.app.company.eventing.consumer.companyEventHandler" depends-on="jvmLauncher">
<!--<property name="scriptHelper" ref="scriptHelper"/>-->
<property name="jvmLauncher" ref="jvmLauncher" />
<property name="defaultMemoryOptions" value="${company.memory.opts}"/>
<property name="defaultMemoryRegex" value="${company.memory.regex}"/>
</bean>
<!-- ERRORS -->
<int:channel id="errors"/>
<int:service-activator id="psErrorLogger" input-channel="errors" ref="psloggingHandler"/>
<bean id="psloggingHandler" class="org.springframework.integration.handler.LoggingHandler">
<constructor-arg index="0" value="DEBUG"></constructor-arg>
<!-- <property name="loggerName" value="com.app.travelerpayments.loggingHandler"/> -->
</bean>发布于 2013-07-18 18:11:46
还不清楚您有什么体系结构,但是如果所有实例都使用来自同一个队列的消息,则每个消息只会被使用一次(除非使用者请求)。我想这是在你的情况下使用AMQP的最好方法。如果我遗漏了什么,请澄清你的问题。
使用-la扇出消息传递时,当每个实例都有自己的队列时,您希望通过自己的消息堆栈来控制消息传递(当然,在几乎所有情况下都是个坏主意),为什么不让所有实例侦听受限于散列exchange的个人队列,并将此交换用于控制消息。您可以告诉实例何时停止或开始消费、刷新它们的队列、计划重新启动等等。
注意,还可以通过特定的路由键使用主题交换和绑定队列,例如“控制”。
其想法是发送who is free请求,选择随机的免费工作人员并向其发送有效负载。您可以使用特定的路由密钥,也可以只将有效负载发布到与队列名称相同的路由密钥的默认交换(默认情况下,队列以与队列名称相同的路由键进行默认交换,请参见违约交换文档中的RabbitMQ部分)。
https://stackoverflow.com/questions/17725303
复制相似问题