首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >可以发出值的akka.streams.Source (类似于monix.BehaviorSubject)

可以发出值的akka.streams.Source (类似于monix.BehaviorSubject)
EN

Stack Overflow用户
提问于 2020-03-05 11:45:56
回答 4查看 635关注 0票数 1

我正在寻找akka.stream.scaladsl.Source构造方法,它允许我简单地从不同的代码位置发出下一个值(例如监视系统事件)。

  • 我需要一些类似于Promise的东西。承诺向Future发出单个值。我需要向Source发出多个值。
  • 就像monix.reactive.subjects.BehaviorSubject.onNext(_)
  • 我不太关心背压。

目前,我已经使用monix & akka-streams (下面的代码)实现了这一点,但我希望应该只有akka-streams解决方案:

代码语言:javascript
复制
import akka.stream.scaladsl.{Flow, Sink, Source}
import monix.reactive.subjects.BehaviorSubject
import monix.execution.Scheduler.Implicits.global

val bs = BehaviorSubject("") //monix subject is sink and source at the same time

//this is how it is currently implemented
def createSource() = { 
    val s1 = Source.fromPublisher(bs.toReactivePublisher) //here we create source from subject
    Flow.fromSinkAndSourceCoupled[String, String](Sink.ignore, s1)
}

//somewhere else in code... some event happened
//this is how it works in monix.
val res:Future[Ack] = bs.onNext("Event #1471 just happened!") //here we emit value
EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2020-03-05 13:48:46

也许你在找演员源

文档中的一个示例:

代码语言:javascript
复制
import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource

trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol

val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
  case Complete =>
}, failureMatcher = {
  case Fail(ex) => ex
}, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)

val ref = source
  .collect {
    case Message(msg) => msg
  }
  .to(Sink.foreach(println))
  .run()

ref ! Message("msg1")

通过这种方式,您可以通过参与者系统向参与者发送消息,这些消息将从流中的ActorSource中发出。

票数 2
EN

Stack Overflow用户

发布于 2020-03-05 12:55:18

顾名思义,Source抽象提供了使用数据源的API。相反,您需要看一看消耗数据的抽象- SinkSink.foreach操作是您要寻找的,最有可能的是:https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html

在这种情况下,代码看起来应该如下所示:

代码语言:javascript
复制
import akka.stream.scaladsl.{Sink, Source}

val s1 = Source.// your WS akka stream source
s1.runWith(Sink.foreach(write))

希望这能有所帮助!

票数 0
EN

Stack Overflow用户

发布于 2020-03-05 13:07:03

我认为您要寻找的是对接收的每个元素调用给定过程的sink.foreach.it,我认为代码如下所示:

代码语言:javascript
复制
s1.runWith(Sink.foreach(write))

本质上,我们正在做的是,对于流一个源,接收器尝试编写该流的每个元素。

编辑

我想你是在找maybe。它创建了一个源,一旦完成了实现的承诺,就会发出这个value.Check 文档

编辑

futureSource也可以在成功完成给定的未来源的元素后,对其进行work.It流。

如果有帮助请告诉我!!

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60544689

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档