首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当8-10个参与者同时运行时,一些Scala参与者进入等待状态

当8-10个参与者同时运行时,一些Scala参与者进入等待状态
EN

Stack Overflow用户
提问于 2011-01-27 19:49:39
回答 2查看 346关注 0票数 2

在我的模型中,大约有8-9个Scala参与者。每个参与者在RabbitMQ服务器上都有自己的队列

在每个参与者的行为方法中,.It不断地列在队列中,如下所示

代码语言:javascript
复制
def act {
    this ! 1
    loop {
      react {
        case 1 => processMessage(QManager.getMessage); this ! 1
      }
    }
  } 

I a rabbitMq QManager getMessage方法

代码语言:javascript
复制
def getMessage: MyObject = {
    getConnection
    val durable = true
    channel.exchangeDeclare(EXCHANGE, "direct", durable)
    channel.queueDeclare(QUEUE, durable, false, false, null)
    channel queueBind (QUEUE, EXCHANGE, _ROUTING_KEY)
    consumer = new QueueingConsumer(channel)
    channel basicConsume (QUEUE, false, consumer)

    var obj = new MyObject
    try {
      val delivery = consumer.nextDelivery
      val msg = new java.io.ObjectInputStream(
        new java.io.ByteArrayInputStream(delivery.getBody)).readObject()
      obj = msg.asInstanceOf[MyObject]
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
    } catch {
      case e: Exception =>logger.error("error in Get Message", e);endConnection
    }
    endConnection
    obj
  }

所有9个Actor都有自己的对象类型和自己的QManager

在GetMessage中,我使用Rabbitmq QueueConsumer

代码语言:javascript
复制
 val delivery = consumer.nextDelivery

nextDelivery方法返回一个对象,当它在队列中找到时,该方法将参与者置于等待状态

当我启动所有8个演员时,只有4个演员的作品很好,其他的都没有说明。我已经测试了每一个独立运行的角色,它们单独启动时运行良好

当我启动4个以上的角色时,问题就出现了

scala角色的线程化有什么问题吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2011-01-28 02:07:27

免责声明:我是Akka的PO

正如Rex所说,你很忙--在一个共享的线程池上等待、独占线程。

我不知道你是否可以选择测试Akka,但是我们支持AMQP的消费者(和生产者)作为参与者:Akka-AMQP

生成AMQP消息:

代码语言:javascript
复制
    val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
    val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), producerId = Some("my_producer"))
producer ! Message("Some simple sting data".getBytes, "some.routing.key")

消费AMQP消息:

代码语言:javascript
复制
val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing.key", actorOf(new Actor { def receive = {
  case Delivery(payload, _, _, _, _, _) => log.info("Received delivery: %s", new String(payload))
}}), None, Some(exchangeParameters)))

另一种选择是使用Akka-Camel与参与者一起使用和生成AMQP消息。

票数 5
EN

Stack Overflow用户

发布于 2011-01-27 22:57:51

你所有的演员都在奔跑;他们从不休息。由于角色是在一个公共线程池中共享的,这意味着幸运的获胜者总是在运行,而不幸的失败者永远得不到任何时间。如果你想拥有一个始终占用整个线程的实体,通常使用java Thread会更好,或者至少使用receive而不是react。您还可以增加执行元池的大小以匹配执行元的数量,但通常情况下,如果有非常多的执行元一直在运行,则应该更仔细地考虑如何构建程序。

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/4815938

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档