首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >巨像背景任务

巨像背景任务
EN

Stack Overflow用户
提问于 2015-12-16 15:55:45
回答 1查看 308关注 0票数 1

我正在使用Tumblr的新的Colossus框架(http://tumblr.github.io/colossus/)构建一个应用程序。关于它的文档仍然很有限(而且我对Akka还很陌生,这一点也没有帮助),所以我想知道是否有人会介入我的方法是否正确。

该应用程序很简单,由两个关键组件组成:

  • 一个将任务排队到Redis中的瘦web服务层。
  • 后台工作人员,它将轮询同一个Redis实例以查找可用的任务,并在它们可用时处理它们。

我做了一个简单的例子来演示我的并发模型将工作(而且它确实起作用),我在下面发布了这个例子。然而,我想确保没有更多的惯用方法来做到这一点。

代码语言:javascript
复制
import colossus.IOSystem
import colossus.protocols.http.Http
import colossus.protocols.http.HttpMethod.Get
import colossus.protocols.http.UrlParsing._
import colossus.service.{Callback, Service}
import colossus.task.Task

object QueueProcessor {
  implicit val io = IOSystem()   // Create separate IOSystem for worker

  Task { ctx =>
    while(true) {
      // Below code is for testing purposes only. This is where the Redis loop will live, and will use a blocking call to get the next available task
      Thread.sleep(5000)
      println("task iteration")
    }
  }

  def ping = println("starting")  // Method to launch this processor
}


object Main extends App {
  implicit val io = IOSystem()  // Primary IOSystem for the web service

  QueueProcessor.ping  // Launch worker

  Service.serve[Http]("app", 8080) { ctx =>
    ctx.handle { conn =>
      conn.become {
        case req@Get on Root => Callback.successful(req.ok("Here"))
        // The methods to add tasks to the queue will live here
      }
    }
  }
}

我测试了上面的模型,它起作用了。后台循环继续运行,而服务则愉快地接受请求。但是,我认为可能有一种更好的方法来处理工作人员(文档中没有发现),或者可能是Akka Streams?

EN

回答 1

Stack Overflow用户

发布于 2015-12-16 17:19:58

我用的是一些对我来说是半熟的东西。然而,新的答案和反馈仍然受到欢迎!

代码语言:javascript
复制
class Processor extends Actor {
  import scala.concurrent.ExecutionContext.Implicits.global

  override def receive = {
    case "start" => self ! "next"
    case "next" => {
      Future {
        blocking {
          // Blocking call here to wait on Redis (BRPOP/BLPOP)

          self ! "next"
        }
      }
    }
  }
}

object Main extends App {
  implicit val io = IOSystem()

  val processor = io.actorSystem.actorOf(Props[Processor])
  processor ! "start"

  Service.serve[Http]("app", 8080) { ctx =>
    ctx.handle { conn =>
      conn.become {
        // Queue here
        case req@Get on Root => Callback.successful(req.ok("Here\n"))
      }
    }
  }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34316491

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档