首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Akka Streams中的副作用来实现从websocket接收的命令

使用Akka Streams中的副作用来实现从websocket接收的命令
EN

Stack Overflow用户
提问于 2017-03-11 07:24:44
回答 1查看 780关注 0票数 2

我希望能够点击网站上的一个按钮,让它代表一个命令,通过websocket向我的程序发出该命令,让我的程序处理该命令(这将产生副作用),然后将该命令的结果返回给该网站以进行呈现。

websocket将负责更新由users视图中的不同参与者应用的状态更改。

例如:通过网站更改AI指令。这会修改一些值,这些值会被报告回网站。其他用户可能会更改其他AI指令,或者AI会对当前条件变化的位置做出反应,这需要客户端更新屏幕。

我在想,我可以让一个参与者负责使用更改的信息更新客户端,而只是让接收流使用更改更新状态?

这是要使用的库吗?有没有更好的方法来实现我想要的?

EN

回答 1

Stack Overflow用户

发布于 2017-03-11 20:52:54

你可以使用akka-streams和akka-http来做这件事。使用执行元作为处理程序时的示例:

代码语言:javascript
复制
package test

import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash, Status}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl.{Flow, Sink, Source, SourceQueueWithComplete}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import akka.pattern.pipe

import scala.concurrent.{ExecutionContext, Future}
import scala.io.StdIn

object Test extends App {
  implicit val actorSystem = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit def executionContext: ExecutionContext = actorSystem.dispatcher

  val routes =
    path("talk") {
      get {
        val handler = actorSystem.actorOf(Props[Handler])
        val flow = Flow.fromSinkAndSource(
          Flow[Message]
            .filter(_.isText)
            .mapAsync(4) {
              case TextMessage.Strict(text) => Future.successful(text)
              case TextMessage.Streamed(textStream) => textStream.runReduce(_ + _)
            }
            .to(Sink.actorRefWithAck[String](handler, Handler.Started, Handler.Ack, Handler.Completed)),
          Source.queue[String](16, OverflowStrategy.backpressure)
            .map(TextMessage.Strict)
            .mapMaterializedValue { queue =>
              handler ! Handler.OutputQueue(queue)
              queue
            }
        )
        handleWebSocketMessages(flow)
      }
    }

  val bindingFuture = Http().bindAndHandle(routes, "localhost", 8080)

  println("Started the server, press enter to shutdown")
  StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())
    .onComplete(_ => actorSystem.terminate())
}

object Handler {
  case object Started
  case object Completed
  case object Ack
  case class OutputQueue(queue: SourceQueueWithComplete[String])
}

class Handler extends Actor with Stash {
  import context.dispatcher

  override def receive: Receive = initialReceive

  def initialReceive: Receive = {
    case Handler.Started =>
      println("Client has connected, waiting for queue")
      context.become(waitQueue)
      sender() ! Handler.Ack

    case Handler.OutputQueue(queue) =>
      println("Queue received, waiting for client")
      context.become(waitClient(queue))
  }

  def waitQueue: Receive = {
    case Handler.OutputQueue(queue) =>
      println("Queue received, starting")
      context.become(running(queue))
      unstashAll()

    case _ =>
      stash()
  }

  def waitClient(queue: SourceQueueWithComplete[String]): Receive = {
    case Handler.Started =>
      println("Client has connected, starting")
      context.become(running(queue))
      sender() ! Handler.Ack
      unstashAll()

    case _ =>
      stash()
  }

  case class ResultWithSender(originalSender: ActorRef, result: QueueOfferResult)

  def running(queue: SourceQueueWithComplete[String]): Receive = {
    case s: String =>
      // do whatever you want here with the received message
      println(s"Received text: $s")

      val originalSender = sender()
      queue
        .offer("some response to the client")
        .map(ResultWithSender(originalSender, _))
        .pipeTo(self)

    case ResultWithSender(originalSender, result) =>
      result match {
        case QueueOfferResult.Enqueued =>   // okay
          originalSender ! Handler.Ack
        case QueueOfferResult.Dropped =>  // due to the OverflowStrategy.backpressure this should not happen
          println("Could not send the response to the client")
          originalSender ! Handler.Ack
        case QueueOfferResult.Failure(e) =>
          println(s"Could not send the response to the client: $e")
          context.stop(self)
        case QueueOfferResult.QueueClosed =>
          println("Outgoing connection to the client has closed")
          context.stop(self)
      }

    case Handler.Completed =>
      println("Client has disconnected")
      queue.complete()
      context.stop(self)

    case Status.Failure(e) =>
      println(s"Client connection has failed: $e")
      e.printStackTrace()
      queue.fail(new RuntimeException("Upstream has failed", e))
      context.stop(self)
  }
}

这里有很多地方可以调整,但基本思想保持不变。或者,您可以使用GraphStage实现handleWebSocketMessages()方法所需的Flow[Message, Message, _]。上面使用的所有内容也在akka-streams文档中进行了详细描述。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42729382

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档