首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Akka-stream中从Flow创建Source?(编程反应式系统活动)

如何在Akka-stream中从Flow创建Source?(编程反应式系统活动)
EN

Stack Overflow用户
提问于 2019-07-03 02:29:02
回答 1查看 140关注 0票数 0

我正在努力完成EDx platform的EFPL -编程反应式系统课程的最后一个作业(名为反应式追随者)。

我能够完成除outgoingFlow之外的所有功能。

在我看来,我应该以某种方式从现有的流程创建一个新的源代码,在阅读了一些内容后,我仍然没有意识到如何执行流程来为新的源代码生成元素。

我尝试过使用mapConcat,但没有成功。

我认为现有的流程是:

代码语言:javascript
复制
eventParserFlow
.via(followersFlow)
.filter(p => isNotified(userId)(p))

现有Flow的类型和我试探性的do实现outgoingFlow可以在这里看到:

代码语言:javascript
复制
val eventParserFlow: Flow[ByteString, Event, NotUsed]
val followersFlow: Flow[Event, (Event, Followers), NotUsed]

def outgoingFlow(userId: Int): Source[ByteString, NotUsed] = {
  eventParserFlow
    .via(followersFlow)
    .filter(p => isNotified(userId)(p))
    .mapConcat { case (e, _) => e.render }
  ???
}

谁能给我一些读物或例子,告诉我如何在Akka中解决类似的问题?

EN

回答 1

Stack Overflow用户

发布于 2019-07-03 05:12:46

只是一个注释-所以不是解决这类问题的最佳资源。您应该使用相应edx课程中的discussion部分

关于你的问题-我不会给你明确的答案,只是一些提示。

在akka-streams中,你不能仅仅从Flow中创建一个SourceFlow负责转换,而Source创建新事件。在你的赋值中,你只是忘了使用一个可用的值。

  1. 仔细阅读了class Server (而不是val (inboundSink, broadcastOut) = ... )中的评论,并试图弄清楚每个vals是用来做什么的,以及它们之间以及它们与应用程序本身之间的关系。了解它们在

中的类型会很有帮助

这些提示应该足以理解如何实现outgoingFlow,即Source[ByteString, NotUsed]

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56858280

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档