首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >消费者/生产者AWS SQS akka scala和synchrone使用者

消费者/生产者AWS SQS akka scala和synchrone使用者
EN

Stack Overflow用户
提问于 2016-06-17 13:06:31
回答 2查看 778关注 0票数 0

我的应用程序有生产者和消费者。我的制片人不定期地发布信息。有时我的队列将是空的,有时我会收到一些消息。我希望让我的消费者收听队列,当消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果消费者还没有完成当前消息的处理,我不希望他接收队列中的另一条消息。

我认为AKKA和AWS可以满足我的需求。通过阅读文献和例子,阿克卡-骆驼似乎可以简化我的工作。

我在github上找到这个示例

我更感兴趣的是消费者的配置( Thread.sleep只是模拟我的处理):

代码语言:javascript
复制
class MySqsConsumer extends Consumer {

  //The SQS URI is an in-only message exchange (autoAck=true)
  override def endpointUri = "aws-sqs://sqs-akka-camel?  amazonSQSClient=#client"

  override def receive = {
    case msg: CamelMessage => {
      println("Start received %s" format msg.bodyAs[String])
      Thread.sleep(4000)
      println("Stop received %s" format msg.bodyAs[String])

    }

  }
 }

StackTrace:

代码语言:javascript
复制
...
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!]
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!]
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!]
Start received Hello World1!
Stop received Hello World1!
Start received Hello World2!
Stop received Hello World2!
Start received Hello World3!
...

这里的问题是akka-camel在完成接收方法之前从队列中读取消息。

:在接收新消息之前,我如何让我的消费者等待过程的结束?如果我使用错误的工具/库,你能引导我使用新的工具吗?

EN

回答 2

Stack Overflow用户

发布于 2016-06-17 20:39:12

不确定是否使用akka流,但如果使用,则将源发送到Sink.queue

代码语言:javascript
复制
val graph=yourSource.to(Sink.queue)

当物化(graph.run)返回一个SinkQueue时,您可以手动从它中提取项目。它将使用背压机制,所以你不必担心如果你不拉物品会发生什么。

注意:我故意忽略了仿制药!但别忘了他们。

票数 1
EN

Stack Overflow用户

发布于 2016-11-04 23:03:45

据我所知,你做不到。骆驼一直在消耗队列中的信息。

您可以设置override def autoAck = false --在确认当前消息之前,它不会向receive()发送另一条消息。但是这个“隐藏”的骆驼演员仍然在消费。

也许您可以提供一个定制的amazonSQSClient并以某种方式控制它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37882356

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档