首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Java RabbitMQ + AMQP阻塞生产者一段时间(锁定)

Java RabbitMQ + AMQP阻塞生产者一段时间(锁定)
EN

Stack Overflow用户
提问于 2013-07-18 13:48:41
回答 1查看 1.5K关注 0票数 1

问题:我们有两个或三个应用程序实例。每个实例都有一个生产者和一个使用者。我们必须安排一些进程,为此我们使用了通用的spring调度程序。此调度程序生成消息并将其抛给"Broker“(RabbitMQ)。在本例中,我们处理相同的数据2或3次,因为每个实例都抛出消息。在第一个生成器抛出消息之前,如何阻止实例的生成器?

配置:

代码语言:javascript
复制
<!-- RabbitMQ configuration -->
<rabbit:connection-factory
        id="connection" host="${rabbit.host}" port="${rabbit.port}"      username="${rabbit.username}" password="${rabbit.password}"
        channel-cache-size="${rabbit.publisherCacheSize}" virtual-host="${rabbit.virtualHost}" />

<!-- Declare executor pool for worker threads -->
<!-- Ensure that the pool-size is greater than the sum of all number of concurrent consumers from rabbit that use this pool to ensure
     you have enough threads for maximum concurrency. We do this by ensuring that this is 1 plus the size of the connection factory cache
     size for all consumers -->
<task:executor id="worker-pool" keep-alive="60" pool-size="${rabbit.consumerChannelCacheSize}" queue-capacity="1000" rejection-policy="CALLER_RUNS"/>


<!-- Message converter -->
<bean id="baseMessageConverter" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
    <property name="classesToBeBound" value="com.company.model.Scraper"/>
</bean>

<bean id="messageConverter" class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
    <constructor-arg index="0" ref="baseMessageConverter"/>
</bean>


<!-- *********************************producer*********************************** -->
<!-- Outbound company Events -->
<int:channel id="producerChannelCompany"/>
<int:gateway id="jobcompanyCompleteEventGateway" service-interface="com.company.eventing.companyEventPublisher"
             default-request-channel="producerChannelCompany"
             default-request-timeout="2000"
             error-channel="errors"/>

<amqp:outbound-channel-adapter id="companyEvents.amqpAdapter" channel="producerChannelCompany"
                               exchange-name="${rabbit.queue.topic}"
                               routing-key="${rabbit.queue.routing.key}"
                               amqp-template="psRabbitTemplate"/>

<rabbit:admin id="psRabbitAdmin" connection-factory="connection" />
<rabbit:template id="psRabbitTemplate" channel-transacted="${rabbit.channelTransacted}" encoding="UTF-8" message-converter="messageConverter" connection-factory="connection"/>
<rabbit:topic-exchange id="ps.topic" name="${rabbit.queue.topic}" durable="true" auto-delete="false"/>



<!-- *********************************consumer*********************************** -->
<rabbit:queue id="ps.queue" name="${rabbit.queue}"  auto-delete="false" durable="true" exclusive="false"  />


<!-- Exchange to queue binding -->
<rabbit:topic-exchange id="ps.topic" name="${rabbit.queue.topic}" durable="true" auto-delete="false" >
    <rabbit:bindings>
        <rabbit:binding queue="${rabbit.queue}" pattern="${rabbit.queue.pattern}"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

<!-- Configuration for consuming company Complete events -->
<amqp:inbound-channel-adapter id="companyAdapter"
                              channel="companyCompleteEventChannel"
                              queue-names="${rabbit.queue}"
                              channel-transacted="${rabbit.channelTransacted}"
                              prefetch-count="${rabbit.prefetchCount}"
                              concurrent-consumers="${rabbit.concurrentConsumers}"
                              connection-factory="connection"
                              message-converter="messageConverter"
                              task-executor="worker-pool"
                              error-channel="errors"/>



<int:channel id="companyCompleteEventChannel"/>
<int:service-activator id="companyCompleteActivator" input-channel="companyCompleteEventChannel"
                       ref="companyEventHandler" method="runScraper"/>

<bean id="jvmLauncher" class="com.app.company.jvm.JvmLauncher" />
<!-- company Event handler -->
<bean id="companyEventHandler" class="com.app.company.eventing.consumer.companyEventHandler" depends-on="jvmLauncher">
    <!--<property name="scriptHelper" ref="scriptHelper"/>-->
    <property name="jvmLauncher" ref="jvmLauncher" />
    <property name="defaultMemoryOptions" value="${company.memory.opts}"/>
    <property name="defaultMemoryRegex" value="${company.memory.regex}"/>
</bean>


<!-- ERRORS -->
<int:channel id="errors"/>
<int:service-activator id="psErrorLogger" input-channel="errors" ref="psloggingHandler"/>

<bean id="psloggingHandler" class="org.springframework.integration.handler.LoggingHandler">
    <constructor-arg index="0" value="DEBUG"></constructor-arg>
    <!-- <property name="loggerName" value="com.app.travelerpayments.loggingHandler"/> -->
</bean>
EN

回答 1

Stack Overflow用户

发布于 2013-07-18 18:11:46

还不清楚您有什么体系结构,但是如果所有实例都使用来自同一个队列的消息,则每个消息只会被使用一次(除非使用者请求)。我想这是在你的情况下使用AMQP的最好方法。如果我遗漏了什么,请澄清你的问题。

使用-la扇出消息传递时,当每个实例都有自己的队列时,您希望通过自己的消息堆栈来控制消息传递(当然,在几乎所有情况下都是个坏主意),为什么不让所有实例侦听受限于散列exchange的个人队列,并将此交换用于控制消息。您可以告诉实例何时停止或开始消费、刷新它们的队列、计划重新启动等等。

注意,还可以通过特定的路由键使用主题交换和绑定队列,例如“控制”。

其想法是发送who is free请求,选择随机的免费工作人员并向其发送有效负载。您可以使用特定的路由密钥,也可以只将有效负载发布到与队列名称相同的路由密钥的默认交换(默认情况下,队列以与队列名称相同的路由键进行默认交换,请参见违约交换文档中的RabbitMQ部分)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/17725303

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档