我正在努力完成EDx platform的EFPL -编程反应式系统课程的最后一个作业(名为反应式追随者)。
我能够完成除outgoingFlow之外的所有功能。
在我看来,我应该以某种方式从现有的流程创建一个新的源代码,在阅读了一些内容后,我仍然没有意识到如何执行流程来为新的源代码生成元素。
我尝试过使用mapConcat,但没有成功。
我认为现有的流程是:
eventParserFlow
.via(followersFlow)
.filter(p => isNotified(userId)(p))现有Flow的类型和我试探性的do实现outgoingFlow可以在这里看到:
val eventParserFlow: Flow[ByteString, Event, NotUsed]
val followersFlow: Flow[Event, (Event, Followers), NotUsed]
def outgoingFlow(userId: Int): Source[ByteString, NotUsed] = {
eventParserFlow
.via(followersFlow)
.filter(p => isNotified(userId)(p))
.mapConcat { case (e, _) => e.render }
???
}谁能给我一些读物或例子,告诉我如何在Akka中解决类似的问题?
发布于 2019-07-03 05:12:46
只是一个注释-所以不是解决这类问题的最佳资源。您应该使用相应edx课程中的discussion部分
关于你的问题-我不会给你明确的答案,只是一些提示。
在akka-streams中,你不能仅仅从Flow中创建一个Source。Flow负责转换,而Source创建新事件。在你的赋值中,你只是忘了使用一个可用的值。
class Server (而不是val (inboundSink, broadcastOut) = ... )中的评论,并试图弄清楚每个vals是用来做什么的,以及它们之间以及它们与应用程序本身之间的关系。了解它们在中的类型会很有帮助
这些提示应该足以理解如何实现outgoingFlow,即Source[ByteString, NotUsed]
https://stackoverflow.com/questions/56858280
复制相似问题