我正在使用Tumblr的新的Colossus框架(http://tumblr.github.io/colossus/)构建一个应用程序。关于它的文档仍然很有限(而且我对Akka还很陌生,这一点也没有帮助),所以我想知道是否有人会介入我的方法是否正确。
该应用程序很简单,由两个关键组件组成:
我做了一个简单的例子来演示我的并发模型将工作(而且它确实起作用),我在下面发布了这个例子。然而,我想确保没有更多的惯用方法来做到这一点。
import colossus.IOSystem
import colossus.protocols.http.Http
import colossus.protocols.http.HttpMethod.Get
import colossus.protocols.http.UrlParsing._
import colossus.service.{Callback, Service}
import colossus.task.Task
object QueueProcessor {
implicit val io = IOSystem() // Create separate IOSystem for worker
Task { ctx =>
while(true) {
// Below code is for testing purposes only. This is where the Redis loop will live, and will use a blocking call to get the next available task
Thread.sleep(5000)
println("task iteration")
}
}
def ping = println("starting") // Method to launch this processor
}
object Main extends App {
implicit val io = IOSystem() // Primary IOSystem for the web service
QueueProcessor.ping // Launch worker
Service.serve[Http]("app", 8080) { ctx =>
ctx.handle { conn =>
conn.become {
case req@Get on Root => Callback.successful(req.ok("Here"))
// The methods to add tasks to the queue will live here
}
}
}
}我测试了上面的模型,它起作用了。后台循环继续运行,而服务则愉快地接受请求。但是,我认为可能有一种更好的方法来处理工作人员(文档中没有发现),或者可能是Akka Streams?
发布于 2015-12-16 17:19:58
我用的是一些对我来说是半熟的东西。然而,新的答案和反馈仍然受到欢迎!
class Processor extends Actor {
import scala.concurrent.ExecutionContext.Implicits.global
override def receive = {
case "start" => self ! "next"
case "next" => {
Future {
blocking {
// Blocking call here to wait on Redis (BRPOP/BLPOP)
self ! "next"
}
}
}
}
}
object Main extends App {
implicit val io = IOSystem()
val processor = io.actorSystem.actorOf(Props[Processor])
processor ! "start"
Service.serve[Http]("app", 8080) { ctx =>
ctx.handle { conn =>
conn.become {
// Queue here
case req@Get on Root => Callback.successful(req.ok("Here\n"))
}
}
}
}https://stackoverflow.com/questions/34316491
复制相似问题