首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用akka流时的事件顺序

使用akka流时的事件顺序
EN

Stack Overflow用户
提问于 2017-11-11 11:49:23
回答 1查看 487关注 0票数 0

阅读akka的文档,我不太清楚消息的顺序,以及是否可以强制执行。让我用我为聊天服务器编写的一小部分代码来设置我问题的上下文。

代码语言:javascript
复制
def flowShape(user: User) = GraphDSL
  .create(Source.actorRef[ChatMessage](bufferSize = 5, OverflowStrategy.fail)) {
    implicit builder =>
      implicit chatSource =>

      import GraphDSL.Implicits._

      val messageFromOutside = builder.add(Flow[String].map {
        case msg: String => UserTextMessage(user, msg)
        case _ => InvalidMessage
      })

      val merge = builder.add(Merge[ChatMessage](2))
      // UPDATE --> this is where the change comes
      // val merge = builder.add(Concat[ChatMessage](2))

      // val channelActorSink = Sink.actorRefWithAck(channelActor, ActorInitMessage, AckMessage, UserLeft(user))
      val channelActorSink = Sink.actorRef(channelActor, UserLeft(user))

      val actorAsSource = builder.materializedValue.map { actor => UserJoined(user, actor) }

      actorAsSource ~> merge.in(0)
      messageFromOutside.out ~> merge.in(1)
      merge ~> channelActorSink

      FlowShape(messageFromOutside.in, chatSource.out)
}

为了让事情变得简单,我用这个流形状和一个非常简单的源和水槽。就像这样--

代码语言:javascript
复制
val source = Source(List[String]("hi", "hello", "what are you upto", "this is nice"))
val sink = Sink.foreach[ChatMessage] {
  case tm: UserTextMessage => println(s"${tm.user.username} :: ${tm.content}")
  case ul: UserLeft => println(s"${ul.user.username} just left the channel")
  case uj: UserJoined => println(s"${uj.user.username} just joined the channel")
  case _ => println(s"do not know what I just received")
}

val mychatchannel = new Channel(420, myactorsystem)

source.via(mychatchannel.chatFlow(User("sushruta"))).runWith(sink)

现在,我的担心来了。在终端中打印的事件顺序是完全不确定的。我也不知道该怎么解决。这是我得到的输出--

代码语言:javascript
复制
[INFO] [11/10/2017 17:42:20.431] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/10/2017 17:42:20.441] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] received a user joined message
[INFO] [11/10/2017 17:42:20.443] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/10/2017 17:42:20.444] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message

输出中缺少第一个消息hihi消息似乎是在打印UserJoin message之前发送的。

我尝试通过使用actorRefWithAck (我在上面的代码中注释掉)来修复它(并在消息传递方面添加一些安全性)。它给出了类似的输出。

代码语言:javascript
复制
[INFO] [11/11/2017 06:33:03.731] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] channel initialized and ready to take events
[INFO] [11/11/2017 06:33:03.735] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.736] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a user joined message
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-4] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a UserLeft message

显然,似乎正在发生的是,在发送UserJoin消息之前,源正在发送消息。我怎么才能解决这个问题?从概念上讲,我想我希望在源出现之前,但在它实际发送第一条消息之前,立即发送UserJoin message。这有可能吗?

谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-11-12 07:22:11

把溪流想象成水管:当有水时,它就会流动。合并运算符不关心来自哪个侧元素。如果你想订购这些输入,你需要告诉Akka,而不是使用Concat。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47237570

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档