为了解决以下问题,我尝试使用akka-streams和akka-http:
为了将传入的连接合并到一个输出中,我提出了以下代码:
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit b: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val merge = b.add(MergePreferred[IncomingConnection](1))
val inA: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8200)
val inB: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8201)
inA ~> merge.preferred
inB ~> merge.in(0)
merge.out ~> Sink.foreach(println)
ClosedShape
}).run()所以,我有一个来自A和B的IncomingConnection的来源。
现在我想以某种方式处理它们,产生响应并发送响应到相应的连接。
也许有更好的方法来存档所有这些东西,但我找不到任何例子来解决这样的问题,在文档或其他人的问题。
而且,我想这个问题是很常见的。
提前谢谢你的帮助。
发布于 2015-12-16 12:47:33
使用IncomingConnection对象处理请求和返回响应的方式是使用handleXXX方法。基于文档和api接口的实例
同步函数
//this processor contains your logic for converting a request into a response
def requestProcessor(httpRequest : HttpRequest) : HttpResponse = ???
val connectionSink = Sink.foreach[IncomingConnection] { conn =>
conn.handleWithSyncHandler(requestProcessor)
}然后,可以在图形构造中使用此connectionSink:
inA ~> merge.preferred
inB ~> merge.in(0)
merge.out ~> connectionSink异步函数
同样,如果您的处理器是异步的:
val asyncReqProcessor(httpRequest : HttpRequest) : Future[HttpResponse] = ???
val connectionSink =
Sink.foreach[IncomingConnection](_ handleWithAsyncHandler asyncProcessor)流
最后,您还可以使用一个流(我喜欢的方法):
val flowReqProcessor : Flow[HttpRequest, HttpResponse,_] = ???
val connectionSink =
Sink.foreach[IncomingConnection](_ handleWith flowReqProcessor )我最喜欢的一个关于流的特性是,您可以在不同的流中重复使用它们。因此,flowReqProcessor可以是val而不是def。
https://stackoverflow.com/questions/34310477
复制相似问题