首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡给websocket的消息

卡夫卡给websocket的消息
EN

Stack Overflow用户
提问于 2016-04-01 04:25:56
回答 1查看 1.3K关注 0票数 6

我试图写一个卡夫卡消费者到websocket流使用反应性- Kafka,akka-http和akka-流。

代码语言:javascript
复制
  val publisherActor = actorSystem.actorOf(CommandPublisher.props)
  val publisher = ActorPublisher[String](publisherActor)
  val commandSource = Source.fromPublisher(publisher) map toMessage
  def toMessage(c: String): Message = TextMessage.Strict(c)

  class CommandPublisher extends ActorPublisher[String] {
    override def receive = {
      case cmd: String =>
        if (isActive && totalDemand > 0)
          onNext(cmd)
    }
  }

  object CommandPublisher {
    def props: Props = Props(new CommandPublisher())
  }

  // This is the route 
  def mainFlow(): Route = {
    path("ws" / "commands" ) {
       handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
    } 
  }

从kafka使用者(此处略去),我做了一个publisherActor ! commandString来动态地向websocket添加内容。

但是,当我启动websocket的多个客户端时,我会在后端遇到这个异常:

代码语言:javascript
复制
[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
  at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
  at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
  ...

不能为所有的websocket客户端使用一个流吗?还是应该为每个客户创建流/publisher参与者?

在这里,我打算向所有websocket客户端发送“当前”/“实时”通知。通知的历史是不相关的,需要对新客户端忽略它。

EN

回答 1

Stack Overflow用户

发布于 2016-04-03 09:33:09

我很抱歉带来坏消息,但看起来这是akka的明确设计。不能按需要对所有客户端重用流的实例。由于Rx模型的结果,扇出必须是“显式的”。

我遇到的例子使用了一个特定于路由器的Flow

代码语言:javascript
复制
  // The flow from beginning to end to be passed into handleWebsocketMessages
  def websocketDispatchFlow(sender: String): Flow[Message, Message, Unit] =
    Flow[Message]
      // First we convert the TextMessage to a ReceivedMessage
      .collect { case TextMessage.Strict(msg) => ReceivedMessage(sender, msg) }
      // Then we send the message to the dispatch actor which fans it out
      .via(dispatchActorFlow(sender))
      // The message is converted back to a TextMessage for serialization across the socket
      .map { case ReceivedMessage(from, msg) => TextMessage.Strict(s"$from: $msg") }

  def route =
    (get & path("chat") & parameter('name)) { name =>
      handleWebsocketMessages(websocketDispatchFlow(sender = name))
    }

以下是对这一问题的讨论:

这正是我不喜欢的阿克卡流,这个明显的扇出。当我从我想要处理的某个地方(例如可观察的或源)接收数据源时,我只想订阅它,我不想关心它是冷的还是热的,或者它是否被其他订阅者订阅。这是我的河流类比。河流不应该关心谁从它喝,而饮酒者不应该关心河流的来源或有多少其他饮酒者。我的示例(相当于Mathias提供的示例)确实共享了数据源,但它只是进行引用计数,您可以有2个订阅者,也可以有100个订阅者,这并不重要。这里我很喜欢,但是如果你不想失去事件,或者你想确保流保持不变,引用计数就不起作用了。但是你使用的是ConnectableObservable,它有connect(): Cancelable,这是非常适合说.一个剧本的LifeCycle插件。如果要为新订阅者重复以前的值,则可以使用BehaviorSubject或ReplaySubject。然后事情就开始了,不需要手动绘制连接图。...(这是https://bionicspirit.com/blog/2015/09/06/monifu-vs-akka-streams.html的).对于具有可观测性和返回可观测性的函数,我们确实有lift,它最接近有名称的东西,并且可以在Monifu中对Subject或其他可观测类型产生很大影响,因为LiftOperators1 (和2),这使得能够在不失去其类型的情况下转换可观测值--这是对RxJava使用lift所做的一种OOP式的改进。 但是,这些函数并不等同于Processor / Subject。区别在于Subject既是消费者又是生产者。这意味着订阅者不能准确地控制数据源何时启动,数据源本质上是热的(意味着多个订阅者共享同一个数据源)。在Rx中,如果您对冷的可观测数据进行建模(这意味着为每个订阅者启动一个新的数据源),这是完全可以接受的。另一方面,在Rx中(通常),拥有只能订阅一次的数据源是不行的,然后就这样了。在Monifu中,这个规则的唯一例外是GroupBy操作符产生的可观测值,但这就像确认规则的异常一样。 这意味着,特别是与Monifu和Reactive协议契约的另一个限制(您不能多次订阅同一个使用者)相结合的是,SubjectProcessor实例是不可重用的。为了使这样的实例可重用,Rx模型需要一个Processor工厂。此外,这意味着每当您想要使用Subject / Processor时,数据源必须自动处于热状态(可在多个订阅者之间共享)。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36348020

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档