首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >请求-用akka-camel和ActiveMQ回复

请求-用akka-camel和ActiveMQ回复
EN

Stack Overflow用户
提问于 2014-05-02 14:31:20
回答 1查看 1.2K关注 0票数 0

更新:似乎有一个更简单的测试用例不起作用:只是试图通过进程内代理将ActiveMQ生产者的消息发送给ActiveMQ使用者。以下是代码:

代码语言:javascript
复制
val brokerURL = "vm://localhost?broker.persistent=false"
val connectionFactory = new ActiveMQConnectionFactory(brokerURL)
val connection = connectionFactory.createConnection()
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val queue = session.createQueue("foo.bar")
val producer = session.createProducer(queue)
val consumer = session.createConsumer(queue)
val message = session.createTextMessage("marco")

producer.send(message)
val resp = consumer.receive(2000)
assert(resp != null)

我正在尝试使用akka-camel实现一个非常简单的请求应答模式。下面是我的(testbench)代码,它试图直接使用activeMQ发送消息并期望得到响应:

代码语言:javascript
复制
val brokerURL = "vm://localhost?broker.persistent=false"

// create in-process broker, session, queue, etc...
val connectionFactory = new ActiveMQConnectionFactory(brokerURL)
val connection = connectionFactory.createConnection()
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val queue = session.createQueue("myapp.somequeue")
val producer = session.createProducer(queue)
val tempDest = session.createTemporaryQueue()
val respConsumer = session.createConsumer(tempDest)
val message = session.createTextMessage("marco")
message.setJMSReplyTo(tempDest)
message.setJMSCorrelationID("myCorrelationID")

// create actor system with CamelExtension
val camel = CamelExtension(system)
val camelContext = camel.context
camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent(brokerURL))
val listener = system.actorOf(Props[Frontend])

// send a message, expect a response
producer.send(message)
val resp: TextMessage = respConsumer.receive(5000).asInstanceOf[TextMessage]
assert(resp.getText() == "polo")

我为消费者演员尝试了两种不同的方法。第一个比较简单,它尝试使用sender !进行响应。

代码语言:javascript
复制
class Frontend extends Actor with Consumer {
  def endpointUri = "activemq:myapp.somequeue"
  override def autoAck = false
  def receive = {
    case msg: CamelMessage => {
      println("received %s" format msg.bodyAs[String])
      sender ! "polo"
    }
  }
}

第二次尝试使用CamelTemplate进行答复:

代码语言:javascript
复制
class Frontend extends Actor with Consumer {
  def endpointUri = "activemq:myapp.somequeue"
  override def autoAck = false
  def receive = {
    case msg: CamelMessage => {
      println("received %s" format msg.bodyAs[String])
      val replyTo = msg.getHeaderAs("JMSReplyTo", classOf[ActiveMQTempQueue], camelContext)
      val correlationId = msg.getHeaderAs("JMSCorrelationID", classOf[String], camelContext)
      camel.template.sendBodyAndHeader("activemq:"+replyTo.getQueueName(), "polo", "JMSCorrelationID", correlationId)
    }
  }
}

我确实看到了来自我的参与者的接收方法的println()输出,所以ActiveMQ消息进入了参与者,但是我在testbench中的respConsumer.receive()调用中得到了一个超时。我在回复中尝试了很多指定和不指定标题的组合。我还尝试过启用和禁用autoAck

提前谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2014-05-02 19:18:04

结果,我需要在JMS代码中调用connection.start()。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/23430613

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档